Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClientRetryPolicy: Adds Cross Regional Retry Logic on 429/3092 and 410/1022 #4677

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<ClientOfficialVersion>3.43.0</ClientOfficialVersion>
<ClientPreviewVersion>3.44.0</ClientPreviewVersion>
<ClientPreviewSuffixVersion>preview.0</ClientPreviewSuffixVersion>
<DirectVersion>3.35.0</DirectVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

410/1022 is directly coming from direct package right? No changes are needed.
How about a separate PR just with that scope?

In-general that's the pattern we are following where direct revision with features it brings.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. However, if we are splitting this PR into two (410/1022 and 429/3092), then the Direct PR needed to go first since this work (329/3092) has dependency on the sub status codes, which is originated from the Direct package.

<DirectVersion>3.36.0</DirectVersion>
<EncryptionOfficialVersion>2.0.4</EncryptionOfficialVersion>
<EncryptionPreviewVersion>2.1.0</EncryptionPreviewVersion>
<EncryptionPreviewSuffixVersion>preview4</EncryptionPreviewSuffixVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(
ItemBatchOperationContext context = new ItemBatchOperationContext(
resolvedPartitionKeyRangeId,
trace,
BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, operation.OperationType, this.retryOptions));
BatchAsyncContainerExecutor.GetRetryPolicy(
this.cosmosContainer,
this.cosmosClientContext?.DocumentClient?.GlobalEndpointManager,
operation.OperationType,
this.retryOptions));

if (itemRequestOptions != null && itemRequestOptions.AddRequestHeaders != null)
{
Expand Down Expand Up @@ -159,6 +163,7 @@ internal virtual async Task ValidateOperationAsync(

private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
GlobalEndpointManager endpointManager,
OperationType operationType,
RetryOptions retryOptions)
{
Expand All @@ -167,6 +172,7 @@ private static IDocumentClientRetryPolicy GetRetryPolicy(
operationType,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager,
retryOptions.MaxRetryWaitTimeInSeconds));
}

Expand Down
67 changes: 55 additions & 12 deletions Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ClientRetryPolicy(
{
this.throttlingRetry = new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: globalEndpointManager,
retryOptions.MaxRetryWaitTimeInSeconds);

this.globalEndpointManager = globalEndpointManager;
Expand Down Expand Up @@ -120,7 +121,18 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
}
}

return await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);
ShouldRetryResult throttleRetryResult = await this.throttlingRetry.ShouldRetryAsync(exception, cancellationToken);

// Today, the only scenario where we would receive a ServiceUnavailableException from the Throttling Retry Policy
// is when we get 410 (Gone) with sub status code 3092 (System Resource Not Available). Note that this is applicable
// for write requests targeted to a multiple master account. In such case, the 410/3092 will get converted into 503.
if (throttleRetryResult.ExceptionToThrow is ServiceUnavailableException)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: Alternative is to do 429/3092 check before throttlingPolicy here explicitly, that seems direct and simpler right?
Also exception creation is expensive and throttle do last longer and overall impact will be higher.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error combination we are trying to handle here is 429/3092 which itself is a scenario of a throttled response. Therefore, IMO ideal place to handle such cases would be the ResourceThrottleRetryPolicy. This will avoid any unnecessary status combination check that should not belong to the ClientRetryPolocy.

On creating and throwing the exception part, I will double check if this can be optimized, but from the scenario handling perspective, I feel ResourceThrottleRetryPolicy is the ideal place.

@FabianMeiswinkel : Any thoughts on this ?

{
return this.TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
shouldMarkEndpointUnavailableForPkRange: true);
}

return throttleRetryResult;
}

/// <summary>
Expand All @@ -143,7 +155,18 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(
return shouldRetryResult;
}

return await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
ShouldRetryResult throttleRetryResult = await this.throttlingRetry.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);

// Today, the only scenario where we would receive a ServiceUnavailableException from the Throttling Retry Policy
// is when we get 410 (Gone) with sub status code 3092 (System Resource Not Available). Note that this is applicable
// for write requests targeted to a multiple master account. In such case, the 410/3092 will get converted into 503.
if (throttleRetryResult.ExceptionToThrow is ServiceUnavailableException)
{
return this.TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
shouldMarkEndpointUnavailableForPkRange: true);
}

return throttleRetryResult;
}

/// <summary>
Expand Down Expand Up @@ -177,6 +200,7 @@ public void OnBeforeSendRequest(DocumentServiceRequest request)
// This enables marking the endpoint unavailability on endpoint failover/unreachability
this.locationEndpoint = this.globalEndpointManager.ResolveServiceEndpoint(request);
request.RequestContext.RouteToLocation(this.locationEndpoint);
this.throttlingRetry.OnBeforeSendRequest(request);
}

private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
Expand Down Expand Up @@ -274,16 +298,8 @@ private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
// Received 503 due to client connect timeout or Gateway
if (statusCode == HttpStatusCode.ServiceUnavailable)
{
DefaultTrace.TraceWarning("ClientRetryPolicy: ServiceUnavailable. Refresh cache and retry. Failed Location: {0}; ResourceAddress: {1}",
this.documentServiceRequest?.RequestContext?.LocationEndpointToRoute?.ToString() ?? string.Empty,
this.documentServiceRequest?.ResourceAddress ?? string.Empty);

// Mark the partition as unavailable.
// Let the ClientRetry logic decide if the request should be retried
this.partitionKeyRangeLocationCache.TryMarkEndpointUnavailableForPartitionKeyRange(
this.documentServiceRequest);

return this.ShouldRetryOnServiceUnavailable();
return this.TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
shouldMarkEndpointUnavailableForPkRange: true);
}

return null;
Expand Down Expand Up @@ -406,6 +422,33 @@ private ShouldRetryResult ShouldRetryOnSessionNotAvailable(DocumentServiceReques
}
}

/// <summary>
/// Attempts to mark the endpoint associated with the current partition key range as unavailable and determines if
/// a retry should be performed due to a ServiceUnavailable (503) response. This method is invoked when a 503
/// Service Unavailable response is received, indicating that the service might be temporarily unavailable.
/// It optionally marks the partition key range as unavailable, which will influence future routing decisions.
/// </summary>
/// <param name="shouldMarkEndpointUnavailableForPkRange">A boolean flag indicating whether the endpoint for the
/// current partition key range should be marked as unavailable.</param>
/// <returns>An instance of <see cref="ShouldRetryResult"/> indicating whether the operation should be retried.</returns>
private ShouldRetryResult TryMarkEndpointUnavailableForPkRangeAndRetryOnServiceUnavailable(
bool shouldMarkEndpointUnavailableForPkRange)
{
DefaultTrace.TraceWarning("ClientRetryPolicy: ServiceUnavailable. Refresh cache and retry. Failed Location: {0}; ResourceAddress: {1}",
this.documentServiceRequest?.RequestContext?.LocationEndpointToRoute?.ToString() ?? string.Empty,
this.documentServiceRequest?.ResourceAddress ?? string.Empty);

if (shouldMarkEndpointUnavailableForPkRange)
{
// Mark the partition as unavailable.
// Let the ClientRetry logic decide if the request should be retried
this.partitionKeyRangeLocationCache.TryMarkEndpointUnavailableForPartitionKeyRange(
this.documentServiceRequest);
}

return this.ShouldRetryOnServiceUnavailable();
}

/// <summary>
/// For a ServiceUnavailable (503.0) we could be having a timeout from Direct/TCP locally or a request to Gateway request with a similar response due to an endpoint not yet available.
/// We try and retry the request only if there are other regions available. The retry logic is applicable for single master write accounts as well.
Expand Down
5 changes: 3 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,9 @@ internal virtual void Initialize(Uri serviceEndpoint,
this.initializeTaskFactory = (_) => TaskHelper.InlineIfPossible<bool>(
() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory),
new ResourceThrottleRetryPolicy(
this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));
maxAttemptCount: this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests,
endpointManager: this.GlobalEndpointManager,
maxWaitTimeInSeconds: this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds));

// Create the task to start the initialize task
// Task will be awaited on in the EnsureValidClientAsync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public MetadataRequestThrottleRetryPolicy(

this.throttlingRetryPolicy = new ResourceThrottleRetryPolicy(
maxRetryAttemptsOnThrottledRequests,
this.globalEndpointManager,
maxRetryWaitTimeInSeconds);

this.retryContext = new MetadataRetryContext
Expand Down
38 changes: 35 additions & 3 deletions Microsoft.Azure.Cosmos/src/ResourceThrottleRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

// Retry when we receive the throttling from server.
Expand All @@ -19,12 +20,15 @@ internal sealed class ResourceThrottleRetryPolicy : IDocumentClientRetryPolicy
private readonly uint backoffDelayFactor;
private readonly int maxAttemptCount;
private readonly TimeSpan maxWaitTimeInMilliseconds;
private readonly IGlobalEndpointManager globalEndpointManager;

private int currentAttemptCount;
private TimeSpan cumulativeRetryDelay;
private bool? isMultiMasterWriteRegion;

public ResourceThrottleRetryPolicy(
int maxAttemptCount,
IGlobalEndpointManager endpointManager,
int maxWaitTimeInSeconds = DefaultMaxWaitTimeInSeconds,
uint backoffDelayFactor = 1)
{
Expand All @@ -33,6 +37,7 @@ public ResourceThrottleRetryPolicy(
throw new ArgumentException("maxWaitTimeInSeconds", "maxWaitTimeInSeconds must be less than " + (int.MaxValue / 1000));
}

this.globalEndpointManager = endpointManager;
this.maxAttemptCount = maxAttemptCount;
this.backoffDelayFactor = backoffDelayFactor;
this.maxWaitTimeInMilliseconds = TimeSpan.FromSeconds(maxWaitTimeInSeconds);
Expand All @@ -59,7 +64,9 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(dce.RetryAfter);
return this.ShouldRetryInternalAsync(
dce?.GetSubStatus(),
dce?.RetryAfter);
}

DefaultTrace.TraceError(
Expand Down Expand Up @@ -88,11 +95,34 @@ public Task<ShouldRetryResult> ShouldRetryAsync(
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.ShouldRetryInternalAsync(cosmosResponseMessage?.Headers.RetryAfter);
return this.ShouldRetryInternalAsync(
cosmosResponseMessage?.Headers.SubStatusCode,
cosmosResponseMessage?.Headers.RetryAfter,
cosmosResponseMessage?.CosmosException);
}

private Task<ShouldRetryResult> ShouldRetryInternalAsync(TimeSpan? retryAfter)
private Task<ShouldRetryResult> ShouldRetryInternalAsync(
SubStatusCodes? subStatusCode,
TimeSpan? retryAfter,
Exception exception = null)
{
if (this.isMultiMasterWriteRegion.HasValue
&& this.isMultiMasterWriteRegion.Value
&& subStatusCode != null
&& subStatusCode == SubStatusCodes.SystemResourceUnavailable)
{
DefaultTrace.TraceError(
"Operation will NOT be retried. Converting SystemResourceUnavailable (429/3092) to ServiceUnavailable (503). Current attempt {0} sub status code: {1}.",
this.currentAttemptCount, SubStatusCodes.SystemResourceUnavailable);

ServiceUnavailableException exceptionToThrow = ServiceUnavailableException.Create(
SubStatusCodes.SystemResourceUnavailable,
innerException: exception);

return Task.FromResult(
ShouldRetryResult.NoRetry(exceptionToThrow));
}

TimeSpan retryDelay = TimeSpan.Zero;
if (this.currentAttemptCount < this.maxAttemptCount &&
this.CheckIfRetryNeeded(retryAfter, out retryDelay))
Expand Down Expand Up @@ -133,6 +163,8 @@ private object GetExceptionMessage(Exception exception)
/// <param name="request">The request being sent to the service.</param>
public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.isMultiMasterWriteRegion = !request.IsReadOnlyRequest
&& (this.globalEndpointManager?.CanUseMultipleWriteLocations(request) ?? false);
}

/// <summary>
Expand Down
9 changes: 5 additions & 4 deletions Microsoft.Azure.Cosmos/src/Routing/LocationCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -529,11 +529,12 @@ public bool ShouldRefreshEndpoints(out bool canRefreshInBackground)

public bool CanUseMultipleWriteLocations(DocumentServiceRequest request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its used in other scenario like evaluating for sessionToken sending or not.
SDK region count might be stale and it should not be used to make decision of not passing sesisonToken.

These are two different usecases and should be distinguished.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the background account refresh (InitializeAccountPropertiesAndStartBackgroundRefresh()) should take care of the staleness isn't it ? Any way, will refactor the code to create another method just to safeguard the existing code path.

{
return this.CanUseMultipleWriteLocations() &&
(request.ResourceType == ResourceType.Document ||
return this.CanUseMultipleWriteLocations()
&& this.locationInfo.AvailableWriteLocations.Count > 1
&& (request.ResourceType == ResourceType.Document ||
(request.ResourceType == ResourceType.StoredProcedure && request.OperationType == Documents.OperationType.ExecuteJavaScript));
}

}
private void ClearStaleEndpointUnavailabilityInfo()
{
if (this.locationUnavailablityInfoByEndpoint.Any())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ public class BatchAsyncBatcherTests
{
private static readonly Exception expectedException = new Exception();
private static readonly BatchPartitionMetric metric = new BatchPartitionMetric();
private GlobalEndpointManager mockedEndpointManager;

[TestInitialize]
public void Initialize()
{
Mock<IDocumentClientInternal> mockedClient = new();

this.mockedEndpointManager = new(
mockedClient.Object,
new ConnectionPolicy());
}

[TestCleanup]
public void Cleanup()
{
this.mockedEndpointManager.Dispose();
}

private ItemBatchOperation CreateItemBatchOperation(bool withContext = false)
{
Expand Down Expand Up @@ -565,12 +582,12 @@ public async Task RetrierGetsCalledOnSplit()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -594,12 +611,12 @@ public async Task RetrierGetsCalledOnCompletingSplit()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand All @@ -623,12 +640,12 @@ public async Task RetrierGetsCalledOnCompletingPartitionMigration()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -672,17 +689,17 @@ public async Task RetrierGetsCalledOn413_3402()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -710,17 +727,17 @@ public async Task RetrierGetsCalledOn413_NoSubstatus()
IDocumentClientRetryPolicy retryPolicy1 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy2 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Read,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

IDocumentClientRetryPolicy retryPolicy3 = new BulkExecutionRetryPolicy(
GetSplitEnabledContainer(),
OperationType.Create,
new ResourceThrottleRetryPolicy(1));
new ResourceThrottleRetryPolicy(1, this.mockedEndpointManager));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
ItemBatchOperation operation2 = this.CreateItemBatchOperation();
Expand Down
Loading
Loading