Skip to content

Commit

Permalink
Bulk executor cache (#857)
Browse files Browse the repository at this point in the history
* factory

* As client cache

* tests

* unrelated contract change

* Check for no-op

* Removing dispose

* Using AsyncCache

* Text

* Reintroducing cache to avoid async locks

* cleanup
  • Loading branch information
ealsur authored and kirankumarkolli committed Oct 3, 2019
1 parent 8cf4234 commit efc5c78
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Microsoft.Azure.Documents;

/// <summary>
/// Cache to create and share Executor instances across the client's lifetime.
/// </summary>
internal class BatchAsyncContainerExecutorCache : IDisposable
{
private ConcurrentDictionary<string, BatchAsyncContainerExecutor> executorsPerContainer = new ConcurrentDictionary<string, BatchAsyncContainerExecutor>();

public BatchAsyncContainerExecutor GetExecutorForContainer(
ContainerCore container,
CosmosClientContext cosmosClientContext)
{
if (!cosmosClientContext.ClientOptions.AllowBulkExecution)
{
throw new InvalidOperationException("AllowBulkExecution is not currently enabled.");
}

string containerLink = container.LinkUri.ToString();
if (this.executorsPerContainer.TryGetValue(containerLink, out BatchAsyncContainerExecutor executor))
{
return executor;
}

BatchAsyncContainerExecutor newExecutor = new BatchAsyncContainerExecutor(
container,
cosmosClientContext,
Constants.MaxOperationsInDirectModeBatchRequest,
Constants.MaxDirectModeBatchRequestBodySizeInBytes);
if (!this.executorsPerContainer.TryAdd(containerLink, newExecutor))
{
newExecutor.Dispose();
}

return this.executorsPerContainer[containerLink];
}

public void Dispose()
{
foreach (KeyValuePair<string, BatchAsyncContainerExecutor> cacheEntry in this.executorsPerContainer)
{
cacheEntry.Value.Dispose();
}
}
}
}
8 changes: 8 additions & 0 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -296,6 +297,7 @@ internal CosmosClient(
internal RequestInvokerHandler RequestHandler { get; private set; }
internal CosmosResponseFactory ResponseFactory { get; private set; }
internal CosmosClientContext ClientContext { get; private set; }
internal BatchAsyncContainerExecutorCache BatchExecutorCache { get; private set; } = new BatchAsyncContainerExecutorCache();

/// <summary>
/// Read Azure Cosmos DB account properties <see cref="Microsoft.Azure.Cosmos.AccountProperties"/>
Expand Down Expand Up @@ -743,6 +745,12 @@ protected virtual void Dispose(bool disposing)
this.DocumentClient.Dispose();
this.DocumentClient = null;
}

if (this.BatchExecutorCache != null)
{
this.BatchExecutorCache.Dispose();
this.BatchExecutorCache = null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,7 @@ internal virtual BatchAsyncContainerExecutor InitializeBatchExecutorForContainer
return null;
}

return new BatchAsyncContainerExecutor(
this,
this.ClientContext,
Constants.MaxOperationsInDirectModeBatchRequest,
Constants.MaxDirectModeBatchRequestBodySizeInBytes);
return this.ClientContext.Client.BatchExecutorCache.GetExecutorForContainer(this, this.ClientContext);
}

private Task<ResponseMessage> ReplaceStreamInternalAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class BatchAsyncContainerExecutorCacheTests
{
[TestMethod]
public async Task ConcurrentGet_ReturnsSameExecutorInstance()
{
Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));

CosmosClientContext context = new ClientContextCore(
client: mockClient.Object,
clientOptions: new CosmosClientOptions() { AllowBulkExecution = true },
userJsonSerializer: null,
defaultJsonSerializer: null,
sqlQuerySpecSerializer: null,
cosmosResponseFactory: null,
requestHandler: null,
documentClient: null);

DatabaseCore db = new DatabaseCore(context, "test");

List<Task<ContainerCore>> tasks = new List<Task<ContainerCore>>();
for (int i = 0; i < 20; i++)
{
tasks.Add(Task.Run(() => Task.FromResult((ContainerCore)db.GetContainer("test"))));
}

await Task.WhenAll(tasks);

BatchAsyncContainerExecutor firstExecutor = tasks[0].Result.BatchExecutor;
Assert.IsNotNull(firstExecutor);
for (int i = 1; i < 20; i++)
{
BatchAsyncContainerExecutor otherExecutor = tasks[i].Result.BatchExecutor;
Assert.AreEqual(firstExecutor, otherExecutor);
}
}

[TestMethod]
[Timeout(60000)]
public async Task SingleTaskScheduler_ExecutorTest()
{
Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));

CosmosClientContext context = new ClientContextCore(
client: mockClient.Object,
clientOptions: new CosmosClientOptions() { AllowBulkExecution = true },
userJsonSerializer: null,
defaultJsonSerializer: null,
sqlQuerySpecSerializer: null,
cosmosResponseFactory: null,
requestHandler: null,
documentClient: null);

DatabaseCore db = new DatabaseCore(context, "test");

List<Task<ContainerCore>> tasks = new List<Task<ContainerCore>>();
for (int i = 0; i < 20; i++)
{
tasks.Add(
Task.Factory.StartNew(() => (ContainerCore)db.GetContainer("test"),
CancellationToken.None,
TaskCreationOptions.None,
new SingleTaskScheduler()));
}

await Task.WhenAll(tasks);

BatchAsyncContainerExecutor firstExecutor = tasks[0].Result.BatchExecutor;
Assert.IsNotNull(firstExecutor);
for (int i = 1; i < 20; i++)
{
BatchAsyncContainerExecutor otherExecutor = tasks[i].Result.BatchExecutor;
Assert.AreEqual(firstExecutor, otherExecutor);
}
}

[TestMethod]
public void Null_When_OptionsOff()
{
Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));

CosmosClientContext context = new ClientContextCore(
client: mockClient.Object,
clientOptions: new CosmosClientOptions() { },
userJsonSerializer: null,
defaultJsonSerializer: null,
sqlQuerySpecSerializer: null,
cosmosResponseFactory: null,
requestHandler: null,
documentClient: null);

DatabaseCore db = new DatabaseCore(context, "test");
ContainerCore container = (ContainerCore)db.GetContainer("test");
Assert.IsNull(container.BatchExecutor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.Core.Tests
using System.Net.Http;
using Microsoft.Azure.Documents;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;

[TestClass]
public class CosmosClientResourceUnitTests
Expand All @@ -19,8 +20,11 @@ public void ValidateUriGenerationForResources()
string databaseId = "db1234";
string crId = "cr42";

Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));

CosmosClientContext context = new ClientContextCore(
client: null,
client: mockClient.Object,
clientOptions: new CosmosClientOptions(),
userJsonSerializer: null,
defaultJsonSerializer: null,
Expand Down Expand Up @@ -120,8 +124,11 @@ public void InitializeBatchExecutorForContainer_Null_WhenAllowBulk_False()
string databaseId = "db1234";
string crId = "cr42";

Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));

CosmosClientContext context = new ClientContextCore(
client: null,
client: mockClient.Object,
clientOptions: new CosmosClientOptions(),
userJsonSerializer: null,
defaultJsonSerializer: null,
Expand All @@ -141,8 +148,11 @@ public void InitializeBatchExecutorForContainer_NotNull_WhenAllowBulk_True()
string databaseId = "db1234";
string crId = "cr42";

Mock<CosmosClient> mockClient = new Mock<CosmosClient>();
mockClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));

CosmosClientContext context = new ClientContextCore(
client: null,
client: mockClient.Object,
clientOptions: new CosmosClientOptions() { AllowBulkExecution = true },
userJsonSerializer: null,
defaultJsonSerializer: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@
"Attributes": [],
"MethodInfo": "System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.DatabaseResponse] CreateDatabaseAsync(System.String, System.Nullable`1[System.Int32], Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)"
},
"System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.DatabaseResponse] CreateDatabaseIfNotExistsAsync(System.String, System.Nullable`1[System.Int32], Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)[System.Runtime.CompilerServices.AsyncStateMachineAttribute(typeof(Microsoft.Azure.Cosmos.CosmosClient+<CreateDatabaseIfNotExistsAsync>d__37))]": {
"System.Threading.Tasks.Task`1[Microsoft.Azure.Cosmos.DatabaseResponse] CreateDatabaseIfNotExistsAsync(System.String, System.Nullable`1[System.Int32], Microsoft.Azure.Cosmos.RequestOptions, System.Threading.CancellationToken)[System.Runtime.CompilerServices.AsyncStateMachineAttribute(typeof(Microsoft.Azure.Cosmos.CosmosClient+<CreateDatabaseIfNotExistsAsync>d__41))]": {
"Type": "Method",
"Attributes": [
"AsyncStateMachineAttribute"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,6 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<Folder Include="Batch\" />
</ItemGroup>

<PropertyGroup>
<SignAssembly>true</SignAssembly>
Expand Down
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- [#835](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/835) Fixed a bug that caused sortedRanges exceptions
- [#846](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/846) Statistics not getting populated correctly on CosmosException.
- [#857](https://github.com/Azure/azure-cosmos-dotnet-v3/pull/857) Fixed reusability of the Bulk support across Container instances

## <a name="3.2.0"/> [3.2.0](https://www.nuget.org/packages/Microsoft.Azure.Cosmos/3.2.0) - 2019-09-17

Expand Down

0 comments on commit efc5c78

Please sign in to comment.