Skip to content

Commit

Permalink
[Internal] Per Partition Automatic Failover: Fixes Metadata Requests …
Browse files Browse the repository at this point in the history
…Retry Policy (#4205)

* Code changes to retry on next preferred region for metadata reads on gateway timeouts.

* Code changes to add retry for PK Ranges call.

* Code changes to mark endpoint unavailable for read when cosmos exception occurs!

* Code changes to fix unit tests. Added global endpoint manager in Pk Range Cache ctor.

* Code changes to fix unit tests.

* Code changes to fix build break.

* Minor code clean-up.

* Code changes to capture metadata location endpoint within on before send request.

* Code changes to address review comments.

* Code changes to fix build failure.

* Code changes to refactor metadata timeout policy.

* Code changes to add retry for request timeout. Fix emulator tests.

* Code changes to add metadata retry policy unit tests.

* Code changes to add more tests.

* Code changes to refactor metadata retry policy logic to increment location index. Addressed review comments.

* Code changes to address review comments.

* Code changes to address review comments.

* Code changes to add separate condition for pk range requests.
  • Loading branch information
kundadebdatta authored Dec 30, 2023
1 parent 5a096d1 commit c555685
Show file tree
Hide file tree
Showing 17 changed files with 578 additions and 72 deletions.
14 changes: 14 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}
}

// Any metadata request will throw a cosmos exception from CosmosHttpClientCore if
// it receives a 503 service unavailable from gateway. This check is to add retry
// mechanism for the metadata requests in such cases.
if (exception is CosmosException cosmosException)
{
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosException.StatusCode,
cosmosException.Headers.SubStatusCode);
if (shouldRetryResult != null)
{
return shouldRetryResult;
}
}

return await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);
}

Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ private async Task OpenPrivateAsync(CancellationToken cancellationToken)
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);

DefaultTrace.TraceWarning("{0} occurred while OpenAsync. Exception Message: {1}", ex.ToString(), ex.Message);
}
Expand Down Expand Up @@ -1033,7 +1033,7 @@ private async Task<bool> GetInitializationTaskAsync(IStoreClientFactory storeCli
tokenProvider: this,
retryPolicy: this.retryPolicy,
telemetryToServiceHelper: this.telemetryToServiceHelper);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache, this.GlobalEndpointManager);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);
Expand Down
13 changes: 10 additions & 3 deletions Microsoft.Azure.Cosmos/src/HttpClient/HttpTimeoutPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Partition Key Requests
if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange)
//Get Partition Key Range Requests
if (documentServiceRequest.ResourceType == ResourceType.PartitionKeyRange
&& documentServiceRequest.OperationType == OperationType.ReadFeed)
{
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Get Addresses Requests
if (documentServiceRequest.ResourceType == ResourceType.Address)
{
return HttpTimeoutPolicyControlPlaneRetriableHotPath.Instance;
}
Expand All @@ -44,7 +51,7 @@ public static HttpTimeoutPolicy GetTimeoutPolicy(
//Meta Data Read
if (HttpTimeoutPolicy.IsMetaData(documentServiceRequest) && documentServiceRequest.IsReadOnlyRequest)
{
return HttpTimeoutPolicyDefault.InstanceShouldThrow503OnTimeout;
return HttpTimeoutPolicyControlPlaneRetriableHotPath.InstanceShouldThrow503OnTimeout;
}

//Default behavior
Expand Down
190 changes: 190 additions & 0 deletions Microsoft.Azure.Cosmos/src/MetadataRequestThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
/// Metadata Request Throttle Retry Policy is combination of endpoint change retry + throttling retry.
/// </summary>
internal sealed class MetadataRequestThrottleRetryPolicy : IDocumentClientRetryPolicy
{
/// <summary>
/// A constant integer defining the default maximum retry wait time in seconds.
/// </summary>
private const int DefaultMaxWaitTimeInSeconds = 60;

/// <summary>
/// A constant integer defining the default maximum retry count on service unavailable.
/// </summary>
private const int DefaultMaxServiceUnavailableRetryCount = 1;

/// <summary>
/// An instance of <see cref="IGlobalEndpointManager"/>.
/// </summary>
private readonly IGlobalEndpointManager globalEndpointManager;

/// <summary>
/// Defines the throttling retry policy that is used as the underlying retry policy.
/// </summary>
private readonly IDocumentClientRetryPolicy throttlingRetryPolicy;

/// <summary>
/// An integer defining the maximum retry count on service unavailable.
/// </summary>
private readonly int maxServiceUnavailableRetryCount;

/// <summary>
/// An instance of <see cref="Uri"/> containing the location endpoint where the partition key
/// range http request will be sent over.
/// </summary>
private MetadataRetryContext retryContext;

/// <summary>
/// An integer capturing the current retry count on service unavailable.
/// </summary>
private int serviceUnavailableRetryCount;

/// <summary>
/// The constructor to initialize an instance of <see cref="MetadataRequestThrottleRetryPolicy"/>.
/// </summary>
/// <param name="endpointManager">An instance of <see cref="GlobalEndpointManager"/></param>
/// <param name="maxRetryAttemptsOnThrottledRequests">An integer defining the maximum number
/// of attempts to retry when requests are throttled.</param>
/// <param name="maxRetryWaitTimeInSeconds">An integer defining the maximum wait time in seconds.</param>
public MetadataRequestThrottleRetryPolicy(
IGlobalEndpointManager endpointManager,
int maxRetryAttemptsOnThrottledRequests,
int maxRetryWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds)
{
this.globalEndpointManager = endpointManager;
this.maxServiceUnavailableRetryCount = Math.Max(
MetadataRequestThrottleRetryPolicy.DefaultMaxServiceUnavailableRetryCount,
this.globalEndpointManager.PreferredLocationCount);

this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
maxRetryWaitTimeInSeconds);

this.retryContext = new MetadataRetryContext
{
RetryLocationIndex = 0,
RetryRequestOnPreferredLocations = true,
};
}

/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="exception">Exception that occured when the operation was tried</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
if (exception is CosmosException cosmosException
&& cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable
&& cosmosException.Headers.SubStatusCode == SubStatusCodes.TransportGenerated503)
{
if (this.IncrementRetryIndexOnServiceUnavailableForMetadataRead())
{
return Task.FromResult(ShouldRetryResult.RetryAfter(TimeSpan.Zero));
}
}

return this.throttlingRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

/// <summary>
/// Should the caller retry the operation.
/// </summary>
/// <param name="cosmosResponseMessage"><see cref="ResponseMessage"/> in return of the request</param>
/// <param name="cancellationToken">An instance of <see cref="CancellationToken"/>.</param>
/// <returns>True indicates caller should retry, False otherwise</returns>
public Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
if (cosmosResponseMessage?.StatusCode == HttpStatusCode.ServiceUnavailable
&& cosmosResponseMessage?.Headers?.SubStatusCode == SubStatusCodes.TransportGenerated503)
{
if (this.IncrementRetryIndexOnServiceUnavailableForMetadataRead())
{
return Task.FromResult(ShouldRetryResult.RetryAfter(TimeSpan.Zero));
}
}

return this.throttlingRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

/// <summary>
/// Method that is called before a request is sent to allow the retry policy implementation
/// to modify the state of the request.
/// </summary>
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
// Clear the previous location-based routing directive.
request.RequestContext.ClearRouteToLocation();
request.RequestContext.RouteToLocation(
this.retryContext.RetryLocationIndex,
this.retryContext.RetryRequestOnPreferredLocations);

Uri metadataLocationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request);

DefaultTrace.TraceInformation("MetadataRequestThrottleRetryPolicy: Routing the metadata request to: {0} for operation type: {1} and resource type: {2}.", metadataLocationEndpoint, request.OperationType, request.ResourceType);
request.RequestContext.RouteToLocation(metadataLocationEndpoint);
}

/// <summary>
/// Increments the location index when a service unavailable exception ocurrs, for any future read requests.
/// </summary>
/// <returns>A boolean flag indicating if the operation was successful.</returns>
private bool IncrementRetryIndexOnServiceUnavailableForMetadataRead()
{
if (this.serviceUnavailableRetryCount++ >= this.maxServiceUnavailableRetryCount)
{
DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Retry count: {0} has exceeded the maximum permitted retry count on service unavailable: {1}.", this.serviceUnavailableRetryCount, this.maxServiceUnavailableRetryCount);
return false;
}

// Retrying on second PreferredLocations.
// RetryCount is used as zero-based index.
DefaultTrace.TraceWarning("MetadataRequestThrottleRetryPolicy: Incrementing the metadata retry location index to: {0}.", this.serviceUnavailableRetryCount);
this.retryContext = new MetadataRetryContext()
{
RetryLocationIndex = this.serviceUnavailableRetryCount,
RetryRequestOnPreferredLocations = true,
};

return true;
}

/// <summary>
/// A helper class containing the required attributes for
/// metadata retry context.
/// </summary>
internal sealed class MetadataRetryContext
{
/// <summary>
/// An integer defining the current retry location index.
/// </summary>
public int RetryLocationIndex { get; set; }

/// <summary>
/// A boolean flag indicating if the request should retry on
/// preferred locations.
/// </summary>
public bool RetryRequestOnPreferredLocations { get; set; }
}
}
}
62 changes: 24 additions & 38 deletions Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,20 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider;
private readonly IStoreModel storeModel;
private readonly CollectionCache collectionCache;
private readonly IGlobalEndpointManager endpointManager;

public PartitionKeyRangeCache(
ICosmosAuthorizationTokenProvider authorizationTokenProvider,
IStoreModel storeModel,
CollectionCache collectionCache)
CollectionCache collectionCache,
IGlobalEndpointManager endpointManager)
{
this.routingMapCache = new AsyncCacheNonBlocking<string, CollectionRoutingMap>(
keyEqualityComparer: StringComparer.Ordinal);
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeModel = storeModel;
this.collectionCache = collectionCache;
this.endpointManager = endpointManager;
}

public virtual async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRangesAsync(
Expand Down Expand Up @@ -121,10 +124,10 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
return await this.routingMapCache.GetAsync(
key: collectionRid,
singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync(
collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics),
collectionRid: collectionRid,
previousRoutingMap: previousValue,
trace: trace,
clientSideRequestStatistics: request?.RequestContext?.ClientRequestStatistics),
forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue));
}
catch (DocumentClientException ex)
Expand Down Expand Up @@ -174,35 +177,6 @@ private static bool ShouldForceRefresh(
return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch;
}

public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(string collectionRid,
string partitionKeyRangeId,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics)
{
try
{
CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync(
key: collectionRid,
singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync(
collectionRid: collectionRid,
previousRoutingMap: null,
trace: trace,
clientSideRequestStatistics: clientSideRequestStatistics),
forceRefresh: (_) => false);

return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId);
}
catch (DocumentClientException ex)
{
if (ex.StatusCode == HttpStatusCode.NotFound)
{
return null;
}

throw;
}
}

private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string collectionRid,
CollectionRoutingMap previousRoutingMap,
Expand All @@ -213,6 +187,12 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string changeFeedNextIfNoneMatch = previousRoutingMap?.ChangeFeedNextIfNoneMatch;

HttpStatusCode lastStatusCode = HttpStatusCode.OK;

RetryOptions retryOptions = new RetryOptions();
MetadataRequestThrottleRetryPolicy metadataRetryPolicy = new (
endpointManager: this.endpointManager,
maxRetryAttemptsOnThrottledRequests: retryOptions.MaxRetryAttemptsOnThrottledRequests,
maxRetryWaitTimeInSeconds: retryOptions.MaxRetryWaitTimeInSeconds);
do
{
INameValueCollection headers = new RequestNameValueCollection();
Expand All @@ -224,10 +204,9 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
headers.Set(HttpConstants.HttpHeaders.IfNoneMatch, changeFeedNextIfNoneMatch);
}

RetryOptions retryOptions = new RetryOptions();
using (DocumentServiceResponse response = await BackoffRetryUtility<DocumentServiceResponse>.ExecuteAsync(
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics),
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds)))
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics, metadataRetryPolicy),
retryPolicy: metadataRetryPolicy))
{
lastStatusCode = response.StatusCode;
changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag];
Expand Down Expand Up @@ -274,7 +253,8 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFeedAsync(string collectionRid,
INameValueCollection headers,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics)
IClientSideRequestStatistics clientSideRequestStatistics,
IDocumentClientRetryPolicy retryPolicy)
{
using (ITrace childTrace = trace.StartChild("Read PartitionKeyRange Change Feed", TraceComponent.Transport, Tracing.TraceLevel.Info))
{
Expand All @@ -285,6 +265,7 @@ private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFe
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
retryPolicy.OnBeforeSendRequest(request);
string authorizationToken = null;
try
{
Expand Down Expand Up @@ -333,6 +314,11 @@ private async Task<DocumentServiceResponse> ExecutePartitionKeyRangeReadChangeFe
childTrace.AddDatum("Exception Message", ex.Message);
throw;
}
catch (CosmosException ce)
{
childTrace.AddDatum("Exception Message", ce.Message);
throw;
}
}
}
}
Expand Down
Loading

0 comments on commit c555685

Please sign in to comment.