Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk stream support #585

Merged
merged 44 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
813f193
migrating base classes
ealsur Jul 23, 2019
34f7653
Adding tests
ealsur Jul 23, 2019
643c082
Adding executor tests
ealsur Jul 23, 2019
d272a61
Applying master changes
ealsur Jul 23, 2019
e2add6e
merge with master
ealsur Jul 29, 2019
03478eb
Comments
ealsur Jul 29, 2019
3a84cbf
Removing some blocking calls
ealsur Jul 29, 2019
7dc7870
Removing more blocking calls
ealsur Jul 29, 2019
ef25381
Removing final blocking calls
ealsur Jul 29, 2019
2c4332d
Missing continuation token
ealsur Jul 29, 2019
dcbac64
Timer dispatch
ealsur Jul 29, 2019
121c4b3
Async Add
ealsur Jul 29, 2019
f966abb
ConfigureAwait
ealsur Jul 30, 2019
f28cd51
Refactoring limiters
ealsur Jul 30, 2019
5ed005c
Fixing test
ealsur Jul 30, 2019
e696741
Addressing comments
ealsur Aug 1, 2019
9e75160
pending tasks
ealsur Aug 1, 2019
9e1a0df
Removing lock
ealsur Aug 1, 2019
0122a62
Adding Dispose UTs
ealsur Aug 1, 2019
30e71de
Pushing retry to batcher
ealsur Aug 1, 2019
8e968fd
Missing using
ealsur Aug 1, 2019
7eed1c2
Removing unnecessary reprocesing of PKRange
ealsur Aug 5, 2019
80616bc
New UTs for Executor
ealsur Aug 5, 2019
639b577
merge with master
ealsur Aug 6, 2019
82d08f6
Merge branch 'master' into users/ealsur/bulkstream
ealsur Aug 7, 2019
133fb9e
Support partial splits
ealsur Aug 7, 2019
f951940
volatile
ealsur Aug 7, 2019
681b64d
removing
ealsur Aug 7, 2019
4dd74b9
Semaphore wrapper
ealsur Aug 7, 2019
8d09157
Spelling
ealsur Aug 7, 2019
ec1ba7c
Refactoring to requeue failed operations
ealsur Aug 7, 2019
cdf0ec6
Refactoring
ealsur Aug 8, 2019
7335a82
Reducing semaphores and fixing bug
ealsur Aug 9, 2019
3146612
Rename
ealsur Aug 9, 2019
c5e17b3
Dispatch outside
ealsur Aug 9, 2019
bd25f19
Variable rename
ealsur Aug 12, 2019
a000f0c
Reducing memory footprint
ealsur Aug 12, 2019
784f3b7
Test
ealsur Aug 12, 2019
fda8fbd
Comments
ealsur Aug 14, 2019
15154de
Batcher refactor
ealsur Aug 15, 2019
47184fd
Adding interlock checker
ealsur Aug 16, 2019
cc8a483
Refactoring itemoperation and context
ealsur Aug 16, 2019
974eaef
New tests
ealsur Aug 16, 2019
3a6ce27
Addressing comments
ealsur Aug 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 141 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

/// <summary>
/// Maintains a batch of operations and dispatches the batch through an executor. Maps results into the original operation contexts.
ealsur marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
/// <seealso cref="BatchAsyncOperationContext"/>
internal class BatchAsyncBatcher : IDisposable
{
private readonly SemaphoreSlim tryAddLimiter;
ealsur marked this conversation as resolved.
Show resolved Hide resolved
private readonly CosmosSerializer CosmosSerializer;
private readonly List<BatchAsyncOperationContext> batchOperations;
private readonly Func<IReadOnlyList<BatchAsyncOperationContext>, CancellationToken, Task<PartitionKeyBatchResponse>> executor;
ealsur marked this conversation as resolved.
Show resolved Hide resolved
private readonly int maxBatchByteSize;
private readonly int maxBatchOperationCount;
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private long currentSize = 0;

public bool IsEmpty => this.batchOperations.Count == 0;

public BatchAsyncBatcher(
int maxBatchOperationCount,
int maxBatchByteSize,
CosmosSerializer cosmosSerializer,
Func<IReadOnlyList<BatchAsyncOperationContext>, CancellationToken, Task<PartitionKeyBatchResponse>> executor)
{
if (maxBatchOperationCount < 1)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ArgumentOutOfRangeException(nameof(maxBatchOperationCount));
}

if (maxBatchByteSize < 1)
{
throw new ArgumentOutOfRangeException(nameof(maxBatchByteSize));
}

if (executor == null)
{
throw new ArgumentNullException(nameof(executor));
}

if (cosmosSerializer == null)
{
throw new ArgumentNullException(nameof(cosmosSerializer));
}

this.batchOperations = new List<BatchAsyncOperationContext>(maxBatchOperationCount);
this.tryAddLimiter = new SemaphoreSlim(1, 1);
this.executor = executor;
this.maxBatchByteSize = maxBatchByteSize;
this.maxBatchOperationCount = maxBatchOperationCount;
this.CosmosSerializer = cosmosSerializer;
}

public async Task<bool> TryAddAsync(BatchAsyncOperationContext batchAsyncOperation)
{
if (batchAsyncOperation == null)
{
throw new ArgumentNullException(nameof(batchAsyncOperation));
}

await this.tryAddLimiter.WaitAsync(this.cancellationTokenSource.Token);
try
{
if (this.batchOperations.Count == this.maxBatchOperationCount)
{
return false;
}

int itemByteSize = batchAsyncOperation.Operation.GetApproximateSerializedLength();

if (itemByteSize + this.currentSize > this.maxBatchByteSize)
{
return false;
}

this.currentSize += itemByteSize;

// Operation index is in the scope of the current batch
batchAsyncOperation.Operation.OperationIndex = this.batchOperations.Count;
this.batchOperations.Add(batchAsyncOperation);
return true;
}
finally
{
this.tryAddLimiter.Release();
}
}

public async Task DispatchAsync(CancellationToken cancellationToken = default(CancellationToken))
{
try
{
using (PartitionKeyBatchResponse batchResponse = await this.executor(this.batchOperations, cancellationToken))
{
// If the batch was not successful, we need to set all the responses
if (!batchResponse.IsSuccessStatusCode)
{
BatchOperationResult errorResult = new BatchOperationResult(batchResponse.StatusCode);
foreach (BatchAsyncOperationContext operation in this.batchOperations)
{
operation.Complete(errorResult);
}

return;
}

for (int index = 0; index < this.batchOperations.Count; index++)
{
BatchAsyncOperationContext operation = this.batchOperations[index];
BatchOperationResult response = batchResponse[index];
operation.Complete(response);
}
}
}
catch (Exception ex)
{
// Exceptions happening during execution fail all the Tasks
foreach (BatchAsyncOperationContext operation in this.batchOperations)
{
operation.Fail(ex);
}
}
}

public void Dispose()
{
this.cancellationTokenSource.Cancel();
this.cancellationTokenSource.Dispose();
this.tryAddLimiter.Dispose();
}
}
}
Loading