Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk: Fixes incorrect routing on split #2069

Merged
merged 8 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimerInSeconds = 1;
private const int TimerWheelBucketCount = 20;
private static readonly TimeSpan TimerWheelResolution = TimeSpan.FromMilliseconds(50);

Expand All @@ -53,11 +52,6 @@ public BatchAsyncContainerExecutor(
int maxServerRequestOperationCount,
int maxServerRequestBodyLength)
{
if (cosmosContainer == null)
{
throw new ArgumentNullException(nameof(cosmosContainer));
}

if (maxServerRequestOperationCount < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxServerRequestOperationCount));
Expand All @@ -68,7 +62,7 @@ public BatchAsyncContainerExecutor(
throw new ArgumentOutOfRangeException(nameof(maxServerRequestBodyLength));
}

this.cosmosContainer = cosmosContainer;
this.cosmosContainer = cosmosContainer ?? throw new ArgumentNullException(nameof(cosmosContainer));
this.cosmosClientContext = cosmosClientContext;
this.maxServerRequestBodyLength = maxServerRequestBodyLength;
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
Expand All @@ -90,7 +84,7 @@ public virtual async Task<TransactionalBatchOperationResult> AddAsync(

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.cosmosContainer, this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.OperationTask;
Expand Down Expand Up @@ -137,10 +131,13 @@ internal virtual async Task ValidateOperationAsync(
await operation.MaterializeResourceAsync(this.cosmosClientContext.SerializerCore, cancellationToken);
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
private static IDocumentClientRetryPolicy GetRetryPolicy(
ContainerInternal containerInternal,
RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
containerInternal,
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}
Expand Down Expand Up @@ -186,6 +183,7 @@ private async Task ReBatchAsync(
CancellationToken cancellationToken)
{
string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
operation.Context.ReRouteOperation(resolvedPartitionKeyRangeId);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
streamer.Add(operation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal class ItemBatchOperationContext : IDisposable
{
public string PartitionKeyRangeId { get; }
public string PartitionKeyRangeId { get; private set; }

public BatchAsyncBatcher CurrentBatcher { get; set; }

Expand Down Expand Up @@ -74,6 +74,11 @@ public void Fail(
this.Dispose();
}

public void ReRouteOperation(string newPartitionKeyRangeId)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
this.PartitionKeyRangeId = newPartitionKeyRangeId;
}

public void Dispose()
{
this.CurrentBatcher = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -17,77 +18,89 @@ namespace Microsoft.Azure.Cosmos
/// <see cref="ItemBatchOperationContext"/>
internal sealed class BulkPartitionKeyRangeGoneRetryPolicy : IDocumentClientRetryPolicy
{
private const int MaxRetries = 1;

private readonly IDocumentClientRetryPolicy nextRetryPolicy;
private readonly ContainerInternal container;

private int retriesAttempted;

public BulkPartitionKeyRangeGoneRetryPolicy(IDocumentClientRetryPolicy nextRetryPolicy)
public BulkPartitionKeyRangeGoneRetryPolicy(
ContainerInternal container,
IDocumentClientRetryPolicy nextRetryPolicy)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.nextRetryPolicy = nextRetryPolicy;
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
Exception exception,
CancellationToken cancellationToken)
{
DocumentClientException clientException = exception as DocumentClientException;

ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(
clientException?.StatusCode,
clientException?.GetSubStatus());

if (shouldRetryResult != null)
if (exception is DocumentClientException clientException)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
return Task.FromResult(shouldRetryResult);
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
clientException?.StatusCode,
clientException?.GetSubStatus(),
cancellationToken);

if (shouldRetryResult != null)
{
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return ShouldRetryResult.NoRetry();
}
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

return this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(exception, cancellationToken);
}

public Task<ShouldRetryResult> ShouldRetryAsync(
public async Task<ShouldRetryResult> ShouldRetryAsync(
ResponseMessage cosmosResponseMessage,
CancellationToken cancellationToken)
{
ShouldRetryResult shouldRetryResult = this.ShouldRetryInternal(cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode);
ShouldRetryResult shouldRetryResult = await this.ShouldRetryInternalAsync(
cosmosResponseMessage?.StatusCode,
cosmosResponseMessage?.Headers.SubStatusCode,
cancellationToken);
if (shouldRetryResult != null)
{
return Task.FromResult(shouldRetryResult);
return shouldRetryResult;
}

if (this.nextRetryPolicy == null)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
return ShouldRetryResult.NoRetry();
}

return this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
return await this.nextRetryPolicy.ShouldRetryAsync(cosmosResponseMessage, cancellationToken);
}

public void OnBeforeSendRequest(DocumentServiceRequest request)
{
this.nextRetryPolicy.OnBeforeSendRequest(request);
}

private ShouldRetryResult ShouldRetryInternal(
private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
HttpStatusCode? statusCode,
SubStatusCodes? subStatusCode)
SubStatusCodes? subStatusCode,
CancellationToken cancellationToken)
{
if (statusCode == HttpStatusCode.Gone
&& (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
|| subStatusCode == SubStatusCodes.NameCacheIsStale
|| subStatusCode == SubStatusCodes.CompletingSplit
|| subStatusCode == SubStatusCodes.CompletingPartitionMigration)
&& this.retriesAttempted < MaxRetries)
if (statusCode == HttpStatusCode.Gone)
{
this.retriesAttempted++;
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
if (subStatusCode == SubStatusCodes.PartitionKeyRangeGone
|| subStatusCode == SubStatusCodes.CompletingSplit
|| subStatusCode == SubStatusCodes.CompletingPartitionMigration)
{
PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
j82w marked this conversation as resolved.
Show resolved Hide resolved
string containerRid = await this.container.GetCachedRIDAsync(forceRefresh: false, cancellationToken: cancellationToken);
await partitionKeyRangeCache.TryGetOverlappingRangesAsync(containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true);
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}

if (subStatusCode == SubStatusCodes.NameCacheIsStale)
{
return ShouldRetryResult.RetryAfter(TimeSpan.Zero);
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
Expand Down Expand Up @@ -448,9 +449,11 @@ public async Task CannotAddToDispatchedBatch()
public async Task RetrierGetsCalledOnSplit()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
Expand All @@ -475,9 +478,11 @@ public async Task RetrierGetsCalledOnSplit()
public async Task RetrierGetsCalledOnCompletingSplit()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
Expand All @@ -502,9 +507,11 @@ public async Task RetrierGetsCalledOnCompletingSplit()
public async Task RetrierGetsCalledOnCompletingPartitionMigration()
{
IDocumentClientRetryPolicy retryPolicy1 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

IDocumentClientRetryPolicy retryPolicy2 = new BulkPartitionKeyRangeGoneRetryPolicy(
GetSplitEnabledContainer(),
new ResourceThrottleRetryPolicy(1));

ItemBatchOperation operation1 = this.CreateItemBatchOperation();
Expand Down Expand Up @@ -545,6 +552,16 @@ public async Task RetrierGetsCalledOnOverFlow()
retryDelegate.Verify(a => a(It.IsAny<ItemBatchOperation>(), It.IsAny<CancellationToken>()), Times.Once);
}

private static ContainerInternal GetSplitEnabledContainer()
{
Mock<ContainerInternal> container = new Mock<ContainerInternal>();
container.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>())).ReturnsAsync(Guid.NewGuid().ToString());
Mock<CosmosClientContext> context = new Mock<CosmosClientContext>();
container.Setup(c => c.ClientContext).Returns(context.Object);
context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());
return container.Object;
}

private class BatchAsyncBatcherThatOverflows : BatchAsyncBatcher
{
public BatchAsyncBatcherThatOverflows(
Expand All @@ -565,5 +582,28 @@ internal override async Task<Tuple<PartitionKeyRangeServerBatchRequest, ArraySeg
return new Tuple<PartitionKeyRangeServerBatchRequest, ArraySegment<ItemBatchOperation>>(serverRequest, new ArraySegment<ItemBatchOperation>(serverRequest.Operations.ToArray(), 1, 1));
}
}

private class ClientWithSplitDetection : MockDocumentClient
{
private readonly Mock<PartitionKeyRangeCache> partitionKeyRangeCache;

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
It.IsAny<Documents.Routing.Range<string>>(),
It.Is<bool>(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
)
).Returns((string collectionRid, Documents.Routing.Range<string> range, bool forceRefresh) => Task.FromResult<IReadOnlyList<PartitionKeyRange>>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
}

internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
{
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public async Task RetryOnSplit()
Mock<ContainerInternal> mockContainer = new Mock<ContainerInternal>();
mockContainer.Setup(x => x.LinkUri).Returns(link);
mockContainer.Setup(x => x.GetPartitionKeyDefinitionAsync(It.IsAny<CancellationToken>())).Returns(Task.FromResult(new PartitionKeyDefinition() { Paths = new Collection<string>() { "/id" } }));
mockContainer.Setup(c => c.GetCachedRIDAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>())).ReturnsAsync(Guid.NewGuid().ToString());
Mock<CosmosClientContext> context = new Mock<CosmosClientContext>();
mockContainer.Setup(c => c.ClientContext).Returns(context.Object);
context.Setup(c => c.DocumentClient).Returns(new ClientWithSplitDetection());


CollectionRoutingMap routingMap = CollectionRoutingMap.TryCreateCompleteRoutingMap(
new[]
Expand Down Expand Up @@ -449,5 +454,28 @@ private class MyDocument

public bool Updated { get; set; }
}

private class ClientWithSplitDetection : MockDocumentClient
{
private readonly Mock<PartitionKeyRangeCache> partitionKeyRangeCache;

public ClientWithSplitDetection()
{
this.partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(MockBehavior.Strict, null, null, null);
this.partitionKeyRangeCache.Setup(
m => m.TryGetOverlappingRangesAsync(
It.IsAny<string>(),
It.IsAny<Documents.Routing.Range<string>>(),
It.Is<bool>(b => b == true) // Mocking only the refresh, if it doesn't get called, the test fails
)
).Returns((string collectionRid, Documents.Routing.Range<string> range, bool forceRefresh) => Task.FromResult<IReadOnlyList<PartitionKeyRange>>(this.ResolveOverlapingPartitionKeyRanges(collectionRid, range, forceRefresh)));
}

internal override Task<PartitionKeyRangeCache> GetPartitionKeyRangeCacheAsync()
{
return Task.FromResult(this.partitionKeyRangeCache.Object);
}

}
}
}
Loading