Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk stream support #585

Merged
merged 44 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
813f193
migrating base classes
ealsur Jul 23, 2019
34f7653
Adding tests
ealsur Jul 23, 2019
643c082
Adding executor tests
ealsur Jul 23, 2019
d272a61
Applying master changes
ealsur Jul 23, 2019
e2add6e
merge with master
ealsur Jul 29, 2019
03478eb
Comments
ealsur Jul 29, 2019
3a84cbf
Removing some blocking calls
ealsur Jul 29, 2019
7dc7870
Removing more blocking calls
ealsur Jul 29, 2019
ef25381
Removing final blocking calls
ealsur Jul 29, 2019
2c4332d
Missing continuation token
ealsur Jul 29, 2019
dcbac64
Timer dispatch
ealsur Jul 29, 2019
121c4b3
Async Add
ealsur Jul 29, 2019
f966abb
ConfigureAwait
ealsur Jul 30, 2019
f28cd51
Refactoring limiters
ealsur Jul 30, 2019
5ed005c
Fixing test
ealsur Jul 30, 2019
e696741
Addressing comments
ealsur Aug 1, 2019
9e75160
pending tasks
ealsur Aug 1, 2019
9e1a0df
Removing lock
ealsur Aug 1, 2019
0122a62
Adding Dispose UTs
ealsur Aug 1, 2019
30e71de
Pushing retry to batcher
ealsur Aug 1, 2019
8e968fd
Missing using
ealsur Aug 1, 2019
7eed1c2
Removing unnecessary reprocesing of PKRange
ealsur Aug 5, 2019
80616bc
New UTs for Executor
ealsur Aug 5, 2019
639b577
merge with master
ealsur Aug 6, 2019
82d08f6
Merge branch 'master' into users/ealsur/bulkstream
ealsur Aug 7, 2019
133fb9e
Support partial splits
ealsur Aug 7, 2019
f951940
volatile
ealsur Aug 7, 2019
681b64d
removing
ealsur Aug 7, 2019
4dd74b9
Semaphore wrapper
ealsur Aug 7, 2019
8d09157
Spelling
ealsur Aug 7, 2019
ec1ba7c
Refactoring to requeue failed operations
ealsur Aug 7, 2019
cdf0ec6
Refactoring
ealsur Aug 8, 2019
7335a82
Reducing semaphores and fixing bug
ealsur Aug 9, 2019
3146612
Rename
ealsur Aug 9, 2019
c5e17b3
Dispatch outside
ealsur Aug 9, 2019
bd25f19
Variable rename
ealsur Aug 12, 2019
a000f0c
Reducing memory footprint
ealsur Aug 12, 2019
784f3b7
Test
ealsur Aug 12, 2019
fda8fbd
Comments
ealsur Aug 14, 2019
15154de
Batcher refactor
ealsur Aug 15, 2019
47184fd
Adding interlock checker
ealsur Aug 16, 2019
cc8a483
Refactoring itemoperation and context
ealsur Aug 16, 2019
974eaef
New tests
ealsur Aug 16, 2019
3a6ce27
Addressing comments
ealsur Aug 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;

/// <summary>
/// Maintains a batch of operations and dispatches it as a unit of work.
/// </summary>
/// <remarks>
/// The dispatch process consists of:
/// 1. Creating a <see cref="PartitionKeyRangeServerBatchRequest"/>.
/// 2. Verifying overflow that might happen due to HybridRow serialization. Any operations that did not fit, get sent to the <see cref="BatchAsyncBatcherRetryDelegate"/>.
/// 3. Execution of the request gets delegated to <see cref="BatchAsyncBatcherExecuteDelegate"/>.
/// 4. If there was a split detected, all operations in the request, are sent to the <see cref="BatchAsyncBatcherRetryDelegate"/> for re-queueing.
/// 5. The result of the request is used to wire up all responses with the original Tasks for each operation.
/// </remarks>
/// <seealso cref="BatchAsyncOperationContext"/>
internal class BatchAsyncBatcher
{
private readonly CosmosSerializer cosmosSerializer;
private readonly List<BatchAsyncOperationContext> batchOperations;
private readonly BatchAsyncBatcherExecuteDelegate executor;
private readonly BatchAsyncBatcherRetryDelegate retrier;
private readonly int maxBatchByteSize;
private readonly int maxBatchOperationCount;
private long currentSize = 0;
private bool dispached = false;
ealsur marked this conversation as resolved.
Show resolved Hide resolved

public bool IsEmpty => this.batchOperations.Count == 0;

public BatchAsyncBatcher(
int maxBatchOperationCount,
int maxBatchByteSize,
CosmosSerializer cosmosSerializer,
BatchAsyncBatcherExecuteDelegate executor,
BatchAsyncBatcherRetryDelegate retrier)
{
if (maxBatchOperationCount < 1)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ArgumentOutOfRangeException(nameof(maxBatchOperationCount));
}

if (maxBatchByteSize < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxBatchByteSize));
}

if (executor == null)
{
throw new ArgumentNullException(nameof(executor));
}

if (retrier == null)
{
throw new ArgumentNullException(nameof(retrier));
}

if (cosmosSerializer == null)
{
throw new ArgumentNullException(nameof(cosmosSerializer));
}

this.batchOperations = new List<BatchAsyncOperationContext>(maxBatchOperationCount);
this.executor = executor;
this.retrier = retrier;
this.maxBatchByteSize = maxBatchByteSize;
this.maxBatchOperationCount = maxBatchOperationCount;
this.cosmosSerializer = cosmosSerializer;
}

public virtual bool TryAdd(BatchAsyncOperationContext operationContext)
{
if (this.dispached)
{
DefaultTrace.TraceCritical($"Add operation attempted on dispatched batch.");
return false;
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}

if (operationContext == null)
{
throw new ArgumentNullException(nameof(operationContext));
}

if (this.batchOperations.Count == this.maxBatchOperationCount)
{
DefaultTrace.TraceVerbose($"Batch is full - Max operation count {this.maxBatchOperationCount} reached.");
return false;
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}

int itemByteSize = operationContext.Operation.GetApproximateSerializedLength();

if (itemByteSize + this.currentSize > this.maxBatchByteSize)
{
DefaultTrace.TraceVerbose($"Batch is full - Max byte size {this.maxBatchByteSize} reached.");
ealsur marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

this.currentSize += itemByteSize;

// Operation index is in the scope of the current batch
operationContext.Operation.OperationIndex = this.batchOperations.Count;
operationContext.CurrentBatcher = this;
this.batchOperations.Add(operationContext);
return true;
}

public virtual async Task DispatchAsync(CancellationToken cancellationToken = default(CancellationToken))
{
PartitionKeyRangeServerBatchRequest serverRequest = await this.CreateServerRequestAsync(cancellationToken);
ealsur marked this conversation as resolved.
Show resolved Hide resolved
// Any overflow goes to a new batch
int overFlowOperations = this.GetOverflowOperations(serverRequest, this.batchOperations);
while (overFlowOperations > 0)
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved
{
await this.retrier(this.batchOperations[this.batchOperations.Count - overFlowOperations], cancellationToken);
overFlowOperations--;
}

try
{
PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken);

List<BatchResponse> responses = new List<BatchResponse>(result.ServerResponses.Count);
if (!result.ContainsSplit())
{
responses.AddRange(result.ServerResponses);
}
else
{
foreach (ItemBatchOperation operationToRetry in result.Operations)
{
await this.retrier(this.batchOperations[operationToRetry.OperationIndex], cancellationToken);
}
}

using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, responses, this.cosmosSerializer))
{
for (int index = 0; index < this.batchOperations.Count; index++)
{
BatchAsyncOperationContext context = this.batchOperations[index];
ealsur marked this conversation as resolved.
Show resolved Hide resolved
BatchOperationResult response = batchResponse[context.Operation.OperationIndex];
if (response != null)
{
context.Complete(this, response);
}
}
}
}
catch (Exception ex)
{
DefaultTrace.TraceError("Exception during BatchAsyncBatcher: {0}", ex);
// Exceptions happening during execution fail all the Tasks part of the request (excluding overflow)
foreach (ItemBatchOperation itemBatchOperation in serverRequest.Operations)
{
BatchAsyncOperationContext context = this.batchOperations[itemBatchOperation.OperationIndex];
context.Fail(this, ex);
}
}
finally
{
this.batchOperations.Clear();
this.dispached = true;
}
}

/// <summary>
/// If because of HybridRow serialization overhead, not all operations fit in the request, we send those extra operations in a separate request.
/// </summary>
internal virtual int GetOverflowOperations(
PartitionKeyRangeServerBatchRequest request,
IReadOnlyList<BatchAsyncOperationContext> operationsSentToRequest)
{
int totalOperations = operationsSentToRequest.Count;
return totalOperations - request.Operations.Count;
}

private async Task<PartitionKeyRangeServerBatchRequest> CreateServerRequestAsync(CancellationToken cancellationToken)
{
// All operations should be for the same PKRange
string partitionKeyRangeId = this.batchOperations[0].PartitionKeyRangeId;

ItemBatchOperation[] operations = new ItemBatchOperation[this.batchOperations.Count];
for (int i = 0; i < this.batchOperations.Count; i++)
{
operations[i] = this.batchOperations[i].Operation;
}

ArraySegment<ItemBatchOperation> operationsArraySegment = new ArraySegment<ItemBatchOperation>(operations);
PartitionKeyRangeServerBatchRequest request = await PartitionKeyRangeServerBatchRequest.CreateAsync(
partitionKeyRangeId,
operationsArraySegment,
this.maxBatchByteSize,
this.maxBatchOperationCount,
ensureContinuousOperationIndexes: false,
serializer: this.cosmosSerializer,
cancellationToken: cancellationToken).ConfigureAwait(false);

return request;
}
}

/// <summary>
/// Executor implementation that processes a list of operations.
/// </summary>
/// <returns>An instance of <see cref="PartitionKeyRangeBatchResponse"/>.</returns>
internal delegate Task<PartitionKeyRangeBatchExecutionResult> BatchAsyncBatcherExecuteDelegate(PartitionKeyRangeServerBatchRequest request, CancellationToken cancellationToken);

/// <summary>
/// Delegate to process a request for retry an operation
/// </summary>
/// <returns>An instance of <see cref="PartitionKeyRangeBatchResponse"/>.</returns>
internal delegate Task BatchAsyncBatcherRetryDelegate(BatchAsyncOperationContext operationContext, CancellationToken cancellationToken);
}
Loading