Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk API congestion control #1074

Merged
merged 40 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8f0e180
Bulk congestion control changes with perf optimizations
rakkuma Dec 3, 2019
f4ea89e
Resource stream null exception - To be reverted later
rakkuma Dec 3, 2019
fa63dc0
Fix
rakkuma Dec 10, 2019
51c2134
Merging with master
rakkuma Dec 10, 2019
7a675ca
fix
rakkuma Dec 10, 2019
b42a8ee
fix
rakkuma Dec 10, 2019
2cd6b00
Code review fix
rakkuma Dec 11, 2019
c3cd1cb
Fixing program.cs
rakkuma Dec 12, 2019
8ef1e99
correcting .json
rakkuma Dec 13, 2019
0b96a30
fix
rakkuma Dec 13, 2019
d74df2e
Code review fix
rakkuma Dec 18, 2019
0bf11cb
Null check removing
rakkuma Dec 18, 2019
ca55d83
Code review changes
rakkuma Dec 23, 2019
3c604c2
Minor fix
rakkuma Dec 23, 2019
aa60e67
Fixing async
rakkuma Dec 23, 2019
af2a3b7
Typo fix
rakkuma Dec 23, 2019
f4d19c9
Merging with master
rakkuma Dec 23, 2019
2f3b29f
Few fix and adding flag for congestion control
rakkuma Dec 26, 2019
9de509d
Minor fixes
rakkuma Jan 6, 2020
bbbd0e0
Merge branch 'master' into users/rakkuma/batch-api-congestion-control
rakkuma Jan 13, 2020
483437e
Fixing contract
rakkuma Jan 13, 2020
9db1625
Moving congestion control logic to streamer
rakkuma Jan 20, 2020
8a5e75a
Code review changes
rakkuma Jan 20, 2020
942cb87
Batcher test case fix
rakkuma Jan 21, 2020
c5df4d9
Code review fix
rakkuma Jan 22, 2020
23ea17a
Merging with master
rakkuma Jan 27, 2020
9c3a57f
Adding unit test case
rakkuma Jan 27, 2020
f453121
Fix
rakkuma Jan 27, 2020
a172308
test case non flaky
rakkuma Jan 27, 2020
c582eb7
Fixing assert
rakkuma Jan 31, 2020
1b41d41
Renaming of EnableCongestionControlForBulkExecution to EnableAdaptive…
rakkuma Feb 5, 2020
7c9fea8
Contract changes
rakkuma Feb 6, 2020
44e7a04
Flaky test case fix
rakkuma Feb 6, 2020
471d5de
Code review changes
rakkuma Feb 11, 2020
2d9d109
Contract changes
rakkuma Feb 12, 2020
8b88985
Merge branch 'master' into users/rakkuma/batch-api-congestion-control
rakkuma Feb 12, 2020
21dd69f
Reverting change for not exposing congestion control knob
rakkuma Feb 14, 2020
3bad759
Code review fix
rakkuma Feb 21, 2020
0f038fc
Minor fix
rakkuma Feb 21, 2020
39c8061
Merge branch 'master' of https://github.com/Azure/azure-cosmos-dotnet…
rakkuma Feb 21, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"EndPointUrl": "https://localhost:8081",
"AuthorizationKey": "Super secret key",
"DatabaseName": "samples",
"CollectionName": "bulk-support",
"CollectionThroughput": "100000",
"ShouldCleanupOnStart": "false",
"ShouldCleanupOnFinish": "false",
"DocumentSize": "1024",
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
"PreCreatedDocuments": "100000",
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
"RuntimeInSeconds": "30"
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
<AssemblyName>Cosmos.Samples.BulkSupport</AssemblyName>
<RootNamespace>Cosmos.Samples.BulkSupport</RootNamespace>
<LangVersion>latest</LangVersion>
<ServerGarbageCollection>true</ServerGarbageCollection>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@j82w how about capturing it part of user-agent for telemetry?

<ThreadPoolMinThreads>128</ThreadPoolMinThreads>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" />
Expand All @@ -18,7 +20,7 @@
<ProjectReference Include="..\Shared\Shared.csproj" />
</ItemGroup>
<ItemGroup>
<None Include="..\AppSettings.json">
<None Include="AppSettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
Expand Down
284 changes: 188 additions & 96 deletions Microsoft.Azure.Cosmos.Samples/Usage/BulkSupport/Program.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
namespace Cosmos.Samples.Shared
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
Expand All @@ -24,47 +27,23 @@

public class Program
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
{
private const int concurrentWorkers = 3;
private const int concurrentDocuments = 100;
private const string databaseId = "samples";
private const string containerId = "bulk-support";
private static readonly JsonSerializer Serializer = new JsonSerializer();

//Reusable instance of ItemClient which represents the connection to a Cosmos endpoint
private static Database database = null;
private static int preCreatedDocuments;
private static int documentSize;
private static int runtimeInSeconds;
private static bool shouldCleanupOnFinish;

// Async main requires c# 7.1 which is set in the csproj with the LangVersion attribute
// <Main>
public static async Task Main(string[] args)
public static void Main(string[] args)
{
try
{
// Read the Cosmos endpointUrl and authorisationKeys from configuration
// These values are available from the Azure Management Portal on the Cosmos Account Blade under "Keys"
// Keep these values in a safe & secure location. Together they provide Administrative access to your Cosmos account
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();
// Intialize container or create a new container.
Container container = Program.Initalizer();
rakkuma marked this conversation as resolved.
Show resolved Hide resolved

string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
}

string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}

CosmosClient bulkClient = Program.GetBulkClientInstance(endpoint, authKey);
// Create the require container, can be done with any client
await Program.InitializeAsync(bulkClient);
ealsur marked this conversation as resolved.
Show resolved Hide resolved

Console.WriteLine("Running demo with a Bulk enabled CosmosClient...");
// Execute inserts for 30 seconds on a Bulk enabled client
await Program.CreateItemsConcurrentlyAsync(bulkClient);
// Running bulk ingestion on a container.
Program.CreateItemsConcurrently(container);
}
catch (CosmosException cre)
{
Expand All @@ -77,71 +56,61 @@ public static async Task Main(string[] args)
}
finally
{
await Program.CleanupAsync();
if (Program.shouldCleanupOnFinish)
{
CleanupAsync();
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
}
Console.WriteLine("End of demo, press any key to exit.");
Console.ReadKey();
}
}
// </Main>

private static CosmosClient GetBulkClientInstance(
string endpoint,
string authKey) =>
// </Initialization>
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true } );
// </Initialization>

private static async Task CreateItemsConcurrentlyAsync(CosmosClient client)
private static void CreateItemsConcurrently(Container container)
{
// Create concurrent workers that will insert items for 30 seconds
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(30000);
CancellationToken cancellationToken = cancellationTokenSource.Token;

Container container = client.GetContainer(Program.databaseId, Program.containerId);
List<Task<int>> workerTasks = new List<Task<int>>(Program.concurrentWorkers);
Console.WriteLine($"Initiating process with {Program.concurrentWorkers} worker threads writing groups of {Program.concurrentDocuments} items for 30 seconds.");
for (var i = 0; i < Program.concurrentWorkers; i++)
{
workerTasks.Add(CreateItemsAsync(container, cancellationToken));
}
Console.WriteLine($"Initiating creates of items of about {documentSize} bytes " +
$"maintaining {preCreatedDocuments} in-progress items for {runtimeInSeconds} seconds.");
rakkuma marked this conversation as resolved.
Show resolved Hide resolved

await Task.WhenAll(workerTasks);
Console.WriteLine($"Inserted {workerTasks.Sum(task => task.Result)} items.");
}
ConcurrentDictionary<HttpStatusCode, int> countsByStatus = new ConcurrentDictionary<HttpStatusCode, int>();
DataSource dataSource = new DataSource(preCreatedDocuments, documentSize);

private static async Task<int> CreateItemsAsync(
Container container,
CancellationToken cancellationToken)
{
int itemsCreated = 0;
string partitionKeyValue = Guid.NewGuid().ToString();
while (!cancellationToken.IsCancellationRequested)
Console.WriteLine("Starting job");
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(runtimeInSeconds * 1000);
CancellationToken cancellationToken = cancellationTokenSource.Token;
try
{
List<Task> tasks = new List<Task>(Program.concurrentDocuments);
for (int i = 0; i < Program.concurrentDocuments; i++)
while (!cancellationToken.IsCancellationRequested)
{
string id = Guid.NewGuid().ToString();
MyDocument myDocument = new MyDocument() { id = id, pk = partitionKeyValue };
tasks.Add(
container.CreateItemAsync<MyDocument>(myDocument, new PartitionKey(partitionKeyValue))
.ContinueWith((Task<ItemResponse<MyDocument>> task) =>
MemoryStream stream = dataSource.GetNextDocItem(out PartitionKey partitionKeyValue);
_ = container.CreateItemStreamAsync(stream, partitionKeyValue, null, cancellationToken)
.ContinueWith((Task<ResponseMessage> task) =>
{
if (!task.IsCompletedSuccessfully)
if (task.IsCompleted)
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
{
AggregateException innerExceptions = task.Exception.Flatten();
CosmosException cosmosException = innerExceptions.InnerExceptions.FirstOrDefault(innerEx => innerEx is CosmosException) as CosmosException;
Console.WriteLine($"Item {myDocument.id} failed with status code {cosmosException.StatusCode}");
if(stream != null) { stream.Dispose(); }
HttpStatusCode resultCode = task.Result.StatusCode;
countsByStatus.AddOrUpdate(resultCode, 1, (_, old) => old + 1);
if (task.Result != null) { task.Result.Dispose(); }
}
}));
task.Dispose();
});
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
}
finally
{
foreach (var countForStatus in countsByStatus)
{
Console.WriteLine(countForStatus.Key + " " + countForStatus.Value);
}

await Task.WhenAll(tasks);

itemsCreated += tasks.Count(task => task.IsCompletedSuccessfully);
}

return itemsCreated;

int created = countsByStatus.SingleOrDefault(x => x.Key == HttpStatusCode.Created).Value;
Console.WriteLine($"Inserted {created} items.");
}

// <Model>
Expand All @@ -151,35 +120,109 @@ private class MyDocument

public string pk { get; set; }

public bool Updated { get; set; }
public string other { get; set; }
}
// </Model>

private static async Task CleanupAsync()
private static Container Initalizer()
{
// Read the Cosmos endpointUrl and authorisationKeys from configuration
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
// These values are available from the Azure Management Portal on the Cosmos Account Blade under "Keys"
// Keep these values in a safe & secure location. Together they provide Administrative access to your Cosmos account
IConfigurationRoot configuration = new ConfigurationBuilder()
.AddJsonFile("appSettings.json")
.Build();

string endpoint = configuration["EndPointUrl"];
if (string.IsNullOrEmpty(endpoint))
{
throw new ArgumentNullException("Please specify a valid endpoint in the appSettings.json");
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
}

string authKey = configuration["AuthorizationKey"];
if (string.IsNullOrEmpty(authKey) || string.Equals(authKey, "Super secret key"))
{
throw new ArgumentException("Please specify a valid AuthorizationKey in the appSettings.json");
}

string DatabaseName = configuration["DatabaseName"];
if (string.IsNullOrEmpty(DatabaseName))
{
throw new ArgumentException("Please specify a valid DatabaseName in the appSettings.json");
}

string CollectionName = configuration["CollectionName"];
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
if (string.IsNullOrEmpty(CollectionName))
{
throw new ArgumentException("Please specify a valid CollectionName in the appSettings.json");
}

Program.preCreatedDocuments = int.Parse(string.IsNullOrEmpty(configuration["PreCreatedDocuments"]) ? "1000" : configuration["PreCreatedDocuments"]);
Program.documentSize = int.Parse(string.IsNullOrEmpty(configuration["DocumentSize"]) ? "1024" : configuration["DocumentSize"]);
Program.runtimeInSeconds = int.Parse(string.IsNullOrEmpty(configuration["RuntimeInSeconds"]) ? "30" : configuration["RuntimeInSeconds"]);

Program.shouldCleanupOnFinish = bool.Parse(string.IsNullOrEmpty(configuration["ShouldCleanupOnFinish"]) ? "false" : configuration["ShouldCleanupOnFinish"]);
bool shouldCleanupOnStart = bool.Parse(string.IsNullOrEmpty(configuration["ShouldCleanupOnStart"]) ? "false" : configuration["ShouldCleanupOnStart"]);
int collectionThroughput = int.Parse(string.IsNullOrEmpty(configuration["CollectionThroughput"]) ? "30000" : configuration["CollectionThroughput"]);

CosmosClient client = GetBulkClientInstance(endpoint, authKey);
Program.database = client.GetDatabase(DatabaseName);
Container container = Program.database.GetContainer(CollectionName); ;
if (shouldCleanupOnStart)
{
container = CreateFreshCollection(client, DatabaseName, CollectionName, collectionThroughput);
}

try
{
container.ReadContainerAsync().GetAwaiter().GetResult();
}
catch(Exception ex)
{
Console.WriteLine("Error in reading collection: {0}", ex.Message);
throw ex;
}

Console.WriteLine("Running demo for container {0} with a Bulk enabled CosmosClient.", CollectionName);

return container;
}

private static CosmosClient GetBulkClientInstance(
string endpoint,
string authKey) =>
// </Initialization>
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
// </Initialization>

private static void CleanupAsync()
{
if (Program.database != null)
{
await Program.database.DeleteAsync();
Program.database.DeleteAsync().GetAwaiter().GetResult();
}
}

private static async Task InitializeAsync(CosmosClient client)
private static Container CreateFreshCollection(CosmosClient client, string datbaseName, string collectionName, int throughput)
{
Program.database = await client.CreateDatabaseIfNotExistsAsync(Program.databaseId);
Program.database = client.CreateDatabaseIfNotExistsAsync(datbaseName).Result;
rakkuma marked this conversation as resolved.
Show resolved Hide resolved

// Delete the existing container to prevent create item conflicts
using (await database.GetContainer(containerId).DeleteContainerStreamAsync())
{ }
try
{
Console.WriteLine("Deleting old collection if it exists.");
database.GetContainer(collectionName).DeleteContainerStreamAsync().GetAwaiter().GetResult();
}
catch(Exception) {
// Do nothing
}

// We create a partitioned collection here which needs a partition key. Partitioned collections
// can be created with very high values of provisioned throughput (up to Throughput = 250,000)
// and used to store up to 250 GB of data.
Console.WriteLine("The demo will create a 20000 RU/s container, press any key to continue.");
// can be created with very high values of provisioned throughput and used to store 100's of GBs of data.
Console.WriteLine($"The demo will create a {throughput} RU/s container, press any key to continue.");
Console.ReadKey();

// Create with a throughput of 20000 RU/s - this demo is about throughput so it needs a higher degree of RU/s to show volume
// Indexing Policy to exclude all attributes to maximize RU/s usage
await database.DefineContainer(containerId, "/pk")
Container container = database.DefineContainer(collectionName, "/pk")
.WithIndexingPolicy()
.WithIndexingMode(IndexingMode.Consistent)
.WithIncludedPaths()
Expand All @@ -188,7 +231,56 @@ await database.DefineContainer(containerId, "/pk")
.Path("/*")
.Attach()
.Attach()
.CreateAsync(20000);
.CreateAsync(throughput).Result;

return container;
}

private class DataSource
{
private int docSize;
private static Stack<KeyValuePair<PartitionKey, MemoryStream>> documentsToImportInBatch;

public DataSource(int initialPoolSize, int docSize)
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
{
this.docSize = docSize;
documentsToImportInBatch = new Stack<KeyValuePair<PartitionKey, MemoryStream>>();

Stack<KeyValuePair<PartitionKey, MemoryStream>> stk = new Stack<KeyValuePair<PartitionKey, MemoryStream>>();
for (int j = 0; j < initialPoolSize; j++)
{
MemoryStream value = CreateNextDocItem(out PartitionKey partitionKeyValue);
documentsToImportInBatch.Push(new KeyValuePair<PartitionKey, MemoryStream>(partitionKeyValue, value));
}
}

private MemoryStream CreateNextDocItem(out PartitionKey partitionKeyValue)
{
string partitionKey = Guid.NewGuid().ToString();
string id = Guid.NewGuid().ToString();
string padding = docSize > 300 ? new string('x', docSize - 300) : string.Empty;
MyDocument myDocument = new MyDocument() { id = id, pk = partitionKey, other = padding };
string value = JsonConvert.SerializeObject(myDocument);
partitionKeyValue = new PartitionKey(partitionKey);

return new MemoryStream(Encoding.UTF8.GetBytes(value)); ;
}

public MemoryStream GetNextDocItem(out PartitionKey partitionKeyValue)
{
if (documentsToImportInBatch.Count > 0)
{
var pair = documentsToImportInBatch.Pop();
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
partitionKeyValue = pair.Key;
return pair.Value;
}
else
{
var value = CreateNextDocItem(out PartitionKey pkValue);
rakkuma marked this conversation as resolved.
Show resolved Hide resolved
partitionKeyValue = pkValue;
return value;
}
}
}
}
}
Expand Down
Loading