Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk API congestion control #1074

Merged
merged 40 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8f0e180
Bulk congestion control changes with perf optimizations
rakkuma Dec 3, 2019
f4ea89e
Resource stream null exception - To be reverted later
rakkuma Dec 3, 2019
fa63dc0
Fix
rakkuma Dec 10, 2019
51c2134
Merging with master
rakkuma Dec 10, 2019
7a675ca
fix
rakkuma Dec 10, 2019
b42a8ee
fix
rakkuma Dec 10, 2019
2cd6b00
Code review fix
rakkuma Dec 11, 2019
c3cd1cb
Fixing program.cs
rakkuma Dec 12, 2019
8ef1e99
correcting .json
rakkuma Dec 13, 2019
0b96a30
fix
rakkuma Dec 13, 2019
d74df2e
Code review fix
rakkuma Dec 18, 2019
0bf11cb
Null check removing
rakkuma Dec 18, 2019
ca55d83
Code review changes
rakkuma Dec 23, 2019
3c604c2
Minor fix
rakkuma Dec 23, 2019
aa60e67
Fixing async
rakkuma Dec 23, 2019
af2a3b7
Typo fix
rakkuma Dec 23, 2019
f4d19c9
Merging with master
rakkuma Dec 23, 2019
2f3b29f
Few fix and adding flag for congestion control
rakkuma Dec 26, 2019
9de509d
Minor fixes
rakkuma Jan 6, 2020
bbbd0e0
Merge branch 'master' into users/rakkuma/batch-api-congestion-control
rakkuma Jan 13, 2020
483437e
Fixing contract
rakkuma Jan 13, 2020
9db1625
Moving congestion control logic to streamer
rakkuma Jan 20, 2020
8a5e75a
Code review changes
rakkuma Jan 20, 2020
942cb87
Batcher test case fix
rakkuma Jan 21, 2020
c5df4d9
Code review fix
rakkuma Jan 22, 2020
23ea17a
Merging with master
rakkuma Jan 27, 2020
9c3a57f
Adding unit test case
rakkuma Jan 27, 2020
f453121
Fix
rakkuma Jan 27, 2020
a172308
test case non flaky
rakkuma Jan 27, 2020
c582eb7
Fixing assert
rakkuma Jan 31, 2020
1b41d41
Renaming of EnableCongestionControlForBulkExecution to EnableAdaptive…
rakkuma Feb 5, 2020
7c9fea8
Contract changes
rakkuma Feb 6, 2020
44e7a04
Flaky test case fix
rakkuma Feb 6, 2020
471d5de
Code review changes
rakkuma Feb 11, 2020
2d9d109
Contract changes
rakkuma Feb 12, 2020
8b88985
Merge branch 'master' into users/rakkuma/batch-api-congestion-control
rakkuma Feb 12, 2020
21dd69f
Reverting change for not exposing congestion control knob
rakkuma Feb 14, 2020
3bad759
Code review fix
rakkuma Feb 21, 2020
0f038fc
Minor fix
rakkuma Feb 21, 2020
39c8061
Merge branch 'master' of https://github.com/Azure/azure-cosmos-dotnet…
rakkuma Feb 21, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"EndPointUrl": "https://localhost:8081",
"AuthorizationKey": "Super secret key",
"DatabaseName": "samples",
"ContainerName": "bulk-support",
"CollectionThroughput": "100000",
"ShouldCleanupOnStart": "false",
"ShouldCleanupOnFinish": "false",
"ItemSize": "400",
"ItemsToCreate": "10000",
"RuntimeInSeconds": "300",
"numWorkers": "1"
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<AssemblyName>Cosmos.Samples.BulkSupport</AssemblyName>
<RootNamespace>Cosmos.Samples.BulkSupport</RootNamespace>
<LangVersion>latest</LangVersion>
<ServerGarbageCollection>true</ServerGarbageCollection>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@j82w how about capturing it part of user-agent for telemetry?

</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
Expand All @@ -18,7 +19,7 @@
<ProjectReference Include="..\Shared\Shared.csproj" />
</ItemGroup>
<ItemGroup>
<None Include="..\AppSettings.json">
<None Include="AppSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
Expand Down
316 changes: 226 additions & 90 deletions Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs

Large diffs are not rendered by default.

16 changes: 15 additions & 1 deletion Microsoft.Azure.Cosmos/src/Batch/BatchAsyncBatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Documents;

/// <summary>
/// Maintains a batch of operations and dispatches it as a unit of work.
Expand Down Expand Up @@ -117,7 +120,9 @@ public virtual bool TryAdd(ItemBatchOperation operation)
return true;
}

public virtual async Task DispatchAsync(CancellationToken cancellationToken = default(CancellationToken))
public virtual async Task DispatchAsync(
BatchPartitionMetric partitionMetric,
CancellationToken cancellationToken = default(CancellationToken))
{
this.interlockIncrementCheck.EnterLockCheck();

Expand Down Expand Up @@ -151,7 +156,16 @@ public virtual bool TryAdd(ItemBatchOperation operation)

try
{
Stopwatch stopwatch = Stopwatch.StartNew();

PartitionKeyRangeBatchExecutionResult result = await this.executor(serverRequest, cancellationToken);

int numThrottle = result.ServerResponse.Any(r => r.StatusCode == (System.Net.HttpStatusCode)StatusCodes.TooManyRequests) ? 1 : 0;
partitionMetric.Add(
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
numberOfDocumentsOperatedOn: result.ServerResponse.Count,
timeTakenInMilliseconds: stopwatch.ElapsedMilliseconds,
numberOfThrottles: numThrottle);

using (PartitionKeyRangeBatchResponse batchResponse = new PartitionKeyRangeBatchResponse(serverRequest.Operations.Count, result.ServerResponse, this.serializerCore))
{
foreach (ItemBatchOperation itemBatchOperation in batchResponse.Operations)
Expand Down
13 changes: 10 additions & 3 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncContainerExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Diagnostics;
Expand Down Expand Up @@ -39,6 +40,9 @@ internal class BatchAsyncContainerExecutor : IDisposable
private readonly TimerPool timerPool;
private readonly RetryOptions retryOptions;

private readonly int defaultMaxDegreeOfConcurrency = 50;
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
private readonly bool adaptiveBulkExecution;

/// <summary>
/// For unit testing.
/// </summary>
Expand Down Expand Up @@ -80,6 +84,7 @@ public BatchAsyncContainerExecutor(
this.dispatchTimerInSeconds = dispatchTimerInSeconds;
this.timerPool = new TimerPool(BatchAsyncContainerExecutor.MinimumDispatchTimerInSeconds);
this.retryOptions = cosmosClientContext.ClientOptions.GetConnectionPolicy().RetryOptions;
this.adaptiveBulkExecution = cosmosClientContext.ClientOptions.AdaptiveBulkExecution;
}

public virtual async Task<TransactionalBatchOperationResult> AddAsync(
Expand Down Expand Up @@ -259,16 +264,18 @@ private BatchAsyncStreamer GetOrAddStreamerForPartitionKeyRange(string partition
{
return streamer;
}

SemaphoreSlim limiter = this.GetOrAddLimiterForPartitionKeyRange(partitionKeyRangeId);
BatchAsyncStreamer newStreamer = new BatchAsyncStreamer(
this.maxServerRequestOperationCount,
this.maxServerRequestBodyLength,
this.dispatchTimerInSeconds,
this.timerPool,
limiter,
this.adaptiveBulkExecution,
this.defaultMaxDegreeOfConcurrency,
this.cosmosClientContext.SerializerCore,
this.ExecuteAsync,
this.ReBatchAsync);

if (!this.streamersByPartitionKeyRange.TryAdd(partitionKeyRangeId, newStreamer))
{
newStreamer.Dispose();
Expand All @@ -284,7 +291,7 @@ private SemaphoreSlim GetOrAddLimiterForPartitionKeyRange(string partitionKeyRan
return limiter;
}

SemaphoreSlim newLimiter = new SemaphoreSlim(1, 1);
SemaphoreSlim newLimiter = new SemaphoreSlim(1, this.defaultMaxDegreeOfConcurrency);
if (!this.limitersByPartitionkeyRange.TryAdd(partitionKeyRangeId, newLimiter))
{
newLimiter.Dispose();
Expand Down
99 changes: 96 additions & 3 deletions Microsoft.Azure.Cosmos/src/Batch/BatchAsyncStreamer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query.Core.Metrics;
using Microsoft.Azure.Documents;

/// <summary>
Expand All @@ -28,16 +29,34 @@ internal class BatchAsyncStreamer : IDisposable
private readonly int dispatchTimerInSeconds;
private readonly CosmosSerializerCore serializerCore;
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

private readonly int congestionIncreaseFactor = 1;
private readonly int congestionControllerDelayInSeconds = 1;
private readonly int congestionDecreaseFactor = 5;
private readonly int maxDegreeOfConcurrency;

private volatile BatchAsyncBatcher currentBatcher;
private TimerPool timerPool;
private PooledTimer currentTimer;
private Task timerTask;

private PooledTimer congestionControlTimer;
private Task congestionControlTask;
private SemaphoreSlim limiter;

private int congestionDegreeOfConcurrency = 1;
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
private long congestionWaitTimeInMilliseconds = 1000;
private BatchPartitionMetric oldPartitionMetric;
private BatchPartitionMetric partitionMetric;

public BatchAsyncStreamer(
int maxBatchOperationCount,
int maxBatchByteSize,
int dispatchTimerInSeconds,
TimerPool timerPool,
SemaphoreSlim limiter,
bool adaptiveBulkExecution,
int maxDegreeOfConcurrency,
CosmosSerializerCore serializerCore,
BatchAsyncBatcherExecuteDelegate executor,
BatchAsyncBatcherRetryDelegate retrier)
Expand Down Expand Up @@ -80,8 +99,17 @@ public BatchAsyncStreamer(
this.timerPool = timerPool;
this.serializerCore = serializerCore;
this.currentBatcher = this.CreateBatchAsyncBatcher();

this.ResetTimer();

this.limiter = limiter;
this.oldPartitionMetric = new BatchPartitionMetric();
this.partitionMetric = new BatchPartitionMetric();
this.maxDegreeOfConcurrency = maxDegreeOfConcurrency;
rakkuma marked this conversation as resolved.
Show resolved Hide resolved

if (adaptiveBulkExecution)
{
this.StartCongestionControlTimer();
}
}

public void Add(ItemBatchOperation operation)
Expand All @@ -99,17 +127,25 @@ public void Add(ItemBatchOperation operation)
if (toDispatch != null)
{
// Discarded for Fire & Forget
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
_ = toDispatch.DispatchAsync(this.cancellationTokenSource.Token);
_ = toDispatch.DispatchAsync(this.partitionMetric, this.cancellationTokenSource.Token);
}
}

public void Dispose()
{
this.cancellationTokenSource.Cancel();
this.cancellationTokenSource.Dispose();

this.currentTimer.CancelTimer();
this.currentTimer = null;
this.timerTask = null;

if (this.congestionControlTimer != null)
{
this.congestionControlTimer.CancelTimer();
this.congestionControlTimer = null;
this.congestionControlTask = null;
}
}

private void ResetTimer()
Expand All @@ -121,6 +157,15 @@ private void ResetTimer()
}, this.cancellationTokenSource.Token);
}

private void StartCongestionControlTimer()
{
this.congestionControlTimer = this.timerPool.GetPooledTimer(this.congestionControllerDelayInSeconds);
this.congestionControlTask = this.congestionControlTimer.StartTimerAsync().ContinueWith(async (task) =>
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
{
await this.RunCongestionControlAsync();
}, this.cancellationTokenSource.Token);
}

private void DispatchTimer()
{
if (this.cancellationTokenSource.IsCancellationRequested)
Expand All @@ -137,7 +182,7 @@ private void DispatchTimer()
if (toDispatch != null)
{
// Discarded for Fire & Forget
_ = toDispatch.DispatchAsync(this.cancellationTokenSource.Token);
_ = toDispatch.DispatchAsync(this.partitionMetric, this.cancellationTokenSource.Token);
}

this.ResetTimer();
Expand All @@ -159,5 +204,53 @@ private BatchAsyncBatcher CreateBatchAsyncBatcher()
{
return new BatchAsyncBatcher(this.maxBatchOperationCount, this.maxBatchByteSize, this.serializerCore, this.executor, this.retrier);
}

private async Task RunCongestionControlAsync()
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
{
while (!this.cancellationTokenSource.Token.IsCancellationRequested)
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
{
long elapsedTimeInMilliseconds = this.partitionMetric.TimeTakenInMilliseconds - this.oldPartitionMetric.TimeTakenInMilliseconds;
rakkuma marked this conversation as resolved.
Show resolved Hide resolved

if (elapsedTimeInMilliseconds >= this.congestionWaitTimeInMilliseconds)
{
long diffThrottle = this.partitionMetric.NumberOfThrottles - this.oldPartitionMetric.NumberOfThrottles;
long changeDocCount = this.partitionMetric.NumberOfDocumentsOperatedOn - this.oldPartitionMetric.NumberOfDocumentsOperatedOn;
this.oldPartitionMetric.Add(changeDocCount, elapsedTimeInMilliseconds, diffThrottle);

if (diffThrottle > 0)
{
// Decrease should not lead to degreeOfConcurrency 0 as this will just block the thread here and no one would release it.
int decreaseCount = Math.Min(this.congestionDecreaseFactor, this.congestionDegreeOfConcurrency / 2);
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved

// We got a throttle so we need to back off on the degree of concurrency.
for (int i = 0; i < decreaseCount; i++)
{
await this.limiter.WaitAsync(this.cancellationTokenSource.Token);
abhijitpai marked this conversation as resolved.
Show resolved Hide resolved
}

this.congestionDegreeOfConcurrency -= decreaseCount;

// In case of throttling increase the wait time, so as to converge max degreeOfConcurrency
this.congestionWaitTimeInMilliseconds += 1000;
}

if (changeDocCount > 0 && diffThrottle == 0)
{
if (this.congestionDegreeOfConcurrency + this.congestionIncreaseFactor <= this.maxDegreeOfConcurrency)
{
// We aren't getting throttles, so we should bump up the degree of concurrency.
this.limiter.Release(this.congestionIncreaseFactor);
this.congestionDegreeOfConcurrency += this.congestionIncreaseFactor;
}
}
}
else
{
break;
}
}

this.StartCongestionControlTimer();
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
90 changes: 90 additions & 0 deletions Microsoft.Azure.Cosmos/src/Batch/BatchPartitionMetric.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;

internal sealed class BatchPartitionMetric
{
public BatchPartitionMetric()
: this(0, 0, 0)
{
}

/// <summary>
/// Initializes a new instance of the OperationMetrics class (instance constructor).
/// </summary>
/// <param name="numberOfDocumentsOperatedOn">Number of documents operated on.</param>
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
/// <param name="timeTakenInMilliseconds">Amount of time taken to insert the documents.</param>
/// <param name="numberOfThrottles">The number of throttles encountered to insert the documents.</param>
public BatchPartitionMetric(long numberOfDocumentsOperatedOn, long timeTakenInMilliseconds, long numberOfThrottles)
{
if (numberOfDocumentsOperatedOn < 0)
{
throw new ArgumentException("numberOfDocumentsOperatedOn must be non negative");
}

if (timeTakenInMilliseconds < 0)
{
throw new ArgumentException("timeTakenInMilliseconds must be non negative");
}

if (numberOfThrottles < 0)
{
throw new ArgumentException("numberOfThrottles must be non negative");
}

this.NumberOfDocumentsOperatedOn = numberOfDocumentsOperatedOn;
this.TimeTakenInMilliseconds = timeTakenInMilliseconds;
this.NumberOfThrottles = numberOfThrottles;
}

/// <summary>
/// Gets the number of documents operated on.
/// </summary>
public long NumberOfDocumentsOperatedOn
{
get; private set;
}

/// <summary>
/// Gets the time taken to operate on the documents.
/// </summary>
public long TimeTakenInMilliseconds
{
get; private set;
}

/// <summary>
/// Gets the number of throttles incurred while operating on the documents.
/// </summary>
public long NumberOfThrottles
{
get; private set;
}

public void Add(long numberOfDocumentsOperatedOn, long timeTakenInMilliseconds, long numberOfThrottles)
{
if (numberOfDocumentsOperatedOn < 0)
{
throw new ArgumentException("numberOfDocumentsOperatedOn must be non negative");
}

if (timeTakenInMilliseconds < 0)
{
throw new ArgumentException("timeTakenInMilliseconds must be non negative");
}

if (numberOfThrottles < 0)
{
throw new ArgumentException("numberOfThrottles must be non negative");
}

this.NumberOfDocumentsOperatedOn += numberOfDocumentsOperatedOn;
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
this.TimeTakenInMilliseconds += timeTakenInMilliseconds;
this.NumberOfThrottles += numberOfThrottles;
}
}
}
Loading