Skip to content

Commit

Permalink
merge to latest
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake Willey committed Sep 10, 2019
2 parents 3e4b5ac + 96ab2f8 commit 22481e0
Show file tree
Hide file tree
Showing 57 changed files with 2,258 additions and 334 deletions.
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/AuthorizationHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Cosmos
internal sealed class AuthorizationHelper
{
public const int MaxAuthorizationHeaderSize = 1024;

// This API is a helper method to create auth header based on client request.
// Uri is split into resourceType/resourceId -
// For feed/post/put requests, resourceId = parentId,
Expand Down
21 changes: 10 additions & 11 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,21 @@ public virtual bool TryAdd(ItemBatchOperation operation)
try
{
PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken);

if (result.IsSplit())
{
foreach (ItemBatchOperation operationToRetry in result.Operations)
{
await this.retrier(operationToRetry, cancellationToken);
}

return;
}

using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.cosmosSerializer))
{
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
{
BatchOperationResult response = batchResponse[itemBatchOperation.OperationIndex];
if (!response.IsSuccessStatusCode)
{
Documents.ShouldRetryResult shouldRetry = await itemBatchOperation.Context.ShouldRetryAsync(response, cancellationToken);
if (shouldRetry.ShouldRetry)
{
await this.retrier(itemBatchOperation, cancellationToken);
continue;
}
}

itemBatchOperation.Context.Complete(this, response);
}
}
Expand Down
29 changes: 23 additions & 6 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace Microsoft.Azure.Cosmos
/// <seealso cref="BatchAsyncStreamer"/>
internal class BatchAsyncContainerExecutor : IDisposable
{
private const int DefaultDispatchTimer = 10;
private const int DefaultDispatchTimerInSeconds = 1;
private const int MinimumDispatchTimerInSeconds = 1;

private readonly ContainerCore cosmosContainer;
Expand All @@ -36,13 +36,21 @@ internal class BatchAsyncContainerExecutor : IDisposable
private readonly ConcurrentDictionary<string, BatchAsyncStreamer> streamersByPartitionKeyRange = new ConcurrentDictionary<string, BatchAsyncStreamer>();
private readonly ConcurrentDictionary<string, SemaphoreSlim> limitersByPartitionkeyRange = new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly TimerPool timerPool;
private readonly RetryOptions retryOptions;

/// <summary>
/// For unit testing.
/// </summary>
internal BatchAsyncContainerExecutor()
{
}

public BatchAsyncContainerExecutor(
ContainerCore cosmosContainer,
CosmosClientContext cosmosClientContext,
int maxServerRequestOperationCount,
int maxServerRequestBodyLength,
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimer)
int dispatchTimerInSeconds = BatchAsyncContainerExecutor.DefaultDispatchTimerInSeconds)
{
if (cosmosContainer == null)
{
Expand Down Expand Up @@ -70,9 +78,10 @@ public BatchAsyncContainerExecutor(
this.maxServerRequestOperationCount = maxServerRequestOperationCount;
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = new TimerPool(BatchAsyncContainerExecutor.MinimumDispatchTimerInSeconds);
this.retryOptions = cosmosClientContext.ClientOptions.GetConnectionPolicy().RetryOptions;
}

public async Task<BatchOperationResult> AddAsync(
public virtual async Task<BatchOperationResult> AddAsync(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
Expand All @@ -86,10 +95,10 @@ public async Task<BatchOperationResult> AddAsync(

string resolvedPartitionKeyRangeId = await this.ResolvePartitionKeyRangeIdAsync(operation, cancellationToken).ConfigureAwait(false);
BatchAsyncStreamer streamer = this.GetOrAddStreamerForPartitionKeyRange(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId);
ItemBatchOperationContext context = new ItemBatchOperationContext(resolvedPartitionKeyRangeId, BatchAsyncContainerExecutor.GetRetryPolicy(this.retryOptions));
operation.AttachContext(context);
streamer.Add(operation);
return await context.Task;
return await context.OperationTask;
}

public void Dispose()
Expand All @@ -107,7 +116,7 @@ public void Dispose()
this.timerPool.Dispose();
}

internal async Task ValidateOperationAsync(
internal virtual async Task ValidateOperationAsync(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions = null,
CancellationToken cancellationToken = default(CancellationToken))
Expand Down Expand Up @@ -135,6 +144,14 @@ internal async Task ValidateOperationAsync(
}
}

private static IDocumentClientRetryPolicy GetRetryPolicy(RetryOptions retryOptions)
{
return new BulkPartitionKeyRangeGoneRetryPolicy(
new ResourceThrottleRetryPolicy(
retryOptions.MaxRetryAttemptsOnThrottledRequests,
retryOptions.MaxRetryWaitTimeInSeconds));
}

private static bool ValidateOperationEPK(
ItemBatchOperation operation,
ItemRequestOptions itemRequestOptions)
Expand Down
50 changes: 45 additions & 5 deletions Microsoft.Azure.Cosmos/src/Batch/BatchExecUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;

/// <summary>
/// Util methods for batch requests.
Expand Down Expand Up @@ -86,10 +89,50 @@ public static async Task<Memory<byte>> StreamToMemoryAsync(
}
}

public static string GetPartitionKeyRangeId(PartitionKeyInternal partitionKeyInternal, PartitionKeyDefinition partitionKeyDefinition, CollectionRoutingMap collectionRoutingMap)
{
string effectivePartitionKey = partitionKeyInternal.GetEffectivePartitionKeyString(partitionKeyDefinition);
return collectionRoutingMap.GetRangeByEffectivePartitionKey(effectivePartitionKey).Id;
}

public static void GetServerRequestLimits(out int maxServerRequestBodyLength, out int maxServerRequestOperationCount)
{
maxServerRequestBodyLength = Constants.MaxDirectModeBatchRequestBodySizeInBytes;
maxServerRequestOperationCount = Constants.MaxOperationsInDirectModeBatchRequest;
}

public static ResponseMessage EnsureValidStream(
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions,
int? maxOperationCount = null)
{
string errorMessage = BatchExecUtils.EnsureValidInternal(operations, batchOptions, maxOperationCount);

if (errorMessage != null)
{
return new ResponseMessage(HttpStatusCode.BadRequest, errorMessage: errorMessage);
}

return new ResponseMessage(HttpStatusCode.OK);
}

public static void EnsureValid(
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions,
int? maxOperationCount = null)
{
string errorMessage = BatchExecUtils.EnsureValidInternal(operations, batchOptions, maxOperationCount);

if (errorMessage != null)
{
throw new ArgumentException(errorMessage);
}
}

private static string EnsureValidInternal(
IReadOnlyList<ItemBatchOperation> operations,
RequestOptions batchOptions,
int? maxOperationCount = null)
{
string errorMessage = null;

Expand Down Expand Up @@ -130,18 +173,15 @@ public static void EnsureValid(
WFConstants.BackendHeaders.EffectivePartitionKeyString);
}

if (operation.PartitionKey != null)
if (operation.PartitionKey != null && !operation.RequestOptions.IsEffectivePartitionKeyRouting)
{
errorMessage = ClientResources.PKAndEpkSetTogether;
}
}
}
}

if (errorMessage != null)
{
throw new ArgumentException(errorMessage);
}
return errorMessage;
}

public static string GetPartitionKeyRangeId(PartitionKey partitionKey, PartitionKeyDefinition partitionKeyDefinition, Routing.CollectionRoutingMap collectionRoutingMap)
Expand Down
17 changes: 17 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchItemRequestOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,22 @@ class BatchItemRequestOptions : RequestOptions
/// <seealso cref="Microsoft.Azure.Cosmos.IndexingPolicy"/>
/// <seealso cref="IndexingDirective"/>
public IndexingDirective? IndexingDirective { get; set; }

internal static BatchItemRequestOptions FromItemRequestOptions(ItemRequestOptions itemRequestOptions)
{
if (itemRequestOptions == null)
{
return null;
}

RequestOptions requestOptions = itemRequestOptions as RequestOptions;
BatchItemRequestOptions batchItemRequestOptions = new BatchItemRequestOptions();
batchItemRequestOptions.IndexingDirective = itemRequestOptions.IndexingDirective;
batchItemRequestOptions.IfMatchEtag = requestOptions.IfMatchEtag;
batchItemRequestOptions.IfNoneMatchEtag = requestOptions.IfNoneMatchEtag;
batchItemRequestOptions.Properties = requestOptions.Properties;
batchItemRequestOptions.IsEffectivePartitionKeyRouting = requestOptions.IsEffectivePartitionKeyRouting;
return batchItemRequestOptions;
}
}
}
12 changes: 11 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchOperationResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ private static Result ReadOperationResult(ref RowReader reader, out BatchOperati

return Result.Success;
}

internal ResponseMessage ToResponseMessage()
{
ResponseMessage responseMessage = new ResponseMessage(this.StatusCode);
responseMessage.Headers.SubStatusCode = this.SubStatusCode;
responseMessage.Headers.ETag = this.ETag;
responseMessage.Headers.RetryAfter = this.RetryAfter;
responseMessage.Content = this.ResourceStream;
return responseMessage;
}
}

/// <summary>
Expand All @@ -194,7 +204,7 @@ class BatchOperationResult<T> : BatchOperationResult
/// <summary>
/// Initializes a new instance of the <see cref="BatchOperationResult{T}"/> class.
/// </summary>
/// <param name="result">CosmosBatchOperationResult with stream resource.</param>
/// <param name="result">BatchOperationResult with stream resource.</param>
/// <param name="resource">Deserialized resource.</param>
internal BatchOperationResult(BatchOperationResult result, T resource)
: base(result)
Expand Down
4 changes: 4 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ public virtual IEnumerator<BatchOperationResult> GetEnumerator()
/// Gets all the Activity IDs associated with the response.
/// </summary>
/// <returns>An enumerable that contains the Activity IDs.</returns>
#if INTERNAL
public virtual IEnumerable<string> GetActivityIds()
#else
internal virtual IEnumerable<string> GetActivityIds()
#endif
{
yield return this.ActivityId;
}
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ internal class ItemBatchOperation : IDisposable
public ItemBatchOperation(
OperationType operationType,
int operationIndex,
PartitionKey? partitionKey,
PartitionKey partitionKey,
string id = null,
Stream resourceStream = null,
BatchItemRequestOptions requestOptions = null)
Expand All @@ -55,7 +55,7 @@ public ItemBatchOperation(
this.RequestOptions = requestOptions;
}

public PartitionKey? PartitionKey { get; }
public PartitionKey? PartitionKey { get; internal set; }

public string Id { get; }

Expand Down
28 changes: 26 additions & 2 deletions Microsoft.Azure.Cosmos/src/Batch/ItemBatchOperationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
namespace Microsoft.Azure.Cosmos
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;

/// <summary>
/// Context for a particular Batch operation.
Expand All @@ -17,13 +19,35 @@ internal class ItemBatchOperationContext : IDisposable

public BatchAsyncBatcher CurrentBatcher { get; set; }

public Task<BatchOperationResult> Task => this.taskCompletionSource.Task;
public Task<BatchOperationResult> OperationTask => this.taskCompletionSource.Task;

private readonly IDocumentClientRetryPolicy retryPolicy;

private TaskCompletionSource<BatchOperationResult> taskCompletionSource = new TaskCompletionSource<BatchOperationResult>();

public ItemBatchOperationContext(string partitionKeyRangeId)
public ItemBatchOperationContext(
string partitionKeyRangeId,
IDocumentClientRetryPolicy retryPolicy = null)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
this.retryPolicy = retryPolicy;
}

/// <summary>
/// Based on the Retry Policy, if a failed response should retry.
/// </summary>
public Task<ShouldRetryResult> ShouldRetryAsync(
BatchOperationResult batchOperationResult,
CancellationToken cancellationToken)
{
if (this.retryPolicy == null
|| batchOperationResult.IsSuccessStatusCode)
{
return Task.FromResult(ShouldRetryResult.NoRetry());
}

ResponseMessage responseMessage = batchOperationResult.ToResponseMessage();
return this.retryPolicy.ShouldRetryAsync(responseMessage, cancellationToken);
}

public void Complete(
Expand Down
Loading

0 comments on commit 22481e0

Please sign in to comment.