Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DocumentClient: Fixes ObjectDisposedException during Background Refresh by adding Cancellation Token #3278

Merged
merged 9 commits into from
Jun 21, 2022
15 changes: 13 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,13 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
//SessionContainer.
internal ISessionContainer sessionContainer;

private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
imanvt marked this conversation as resolved.
Show resolved Hide resolved

private AsyncLazy<QueryPartitionProvider> queryPartitionProvider;

private DocumentClientEventSource eventSource;
private Func<bool, Task<bool>> initializeTaskFactory;
internal AsyncCacheNonBlocking<string, bool> initTaskCache = new AsyncCacheNonBlocking<string, bool>();
internal AsyncCacheNonBlocking<string, bool> initTaskCache;

private JsonSerializerSettings serializerSettings;
private event EventHandler<SendingRequestEventArgs> sendingRequest;
Expand Down Expand Up @@ -218,6 +220,8 @@ public DocumentClient(Uri serviceEndpoint,
}

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);

}

/// <summary>
Expand Down Expand Up @@ -459,6 +463,7 @@ internal DocumentClient(Uri serviceEndpoint,
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(cosmosAuthorization));
this.transportClientHandlerFactory = transportClientHandlerFactory;
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);

this.Initialize(
serviceEndpoint: serviceEndpoint,
Expand Down Expand Up @@ -1200,6 +1205,11 @@ public void Dispose()
return;
}

if (this.cancellationTokenSource != null && !this.cancellationTokenSource.IsCancellationRequested)
imanvt marked this conversation as resolved.
Show resolved Hide resolved
{
this.cancellationTokenSource.Cancel();
imanvt marked this conversation as resolved.
Show resolved Hide resolved
}

if (this.StoreModel != null)
{
this.StoreModel.Dispose();
Expand Down Expand Up @@ -6596,7 +6606,8 @@ private async Task InitializeGatewayConfigurationReaderAsync()
serviceEndpoint: this.ServiceEndpoint,
cosmosAuthorization: this.cosmosAuthorization,
connectionPolicy: this.ConnectionPolicy,
httpClient: this.httpClient);
httpClient: this.httpClient,
cancellationToken: this.cancellationTokenSource.Token);

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

Expand Down
10 changes: 7 additions & 3 deletions Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Globalization;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Routing;
Expand All @@ -21,17 +22,20 @@ internal sealed class GatewayAccountReader
private readonly AuthorizationTokenProvider cosmosAuthorization;
private readonly CosmosHttpClient httpClient;
private readonly Uri serviceEndpoint;
private CancellationToken cancellationToken;
imanvt marked this conversation as resolved.
Show resolved Hide resolved

// Backlog: Auth abstractions are spilling through. 4 arguments for this CTOR are result of it.
public GatewayAccountReader(Uri serviceEndpoint,
AuthorizationTokenProvider cosmosAuthorization,
ConnectionPolicy connectionPolicy,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
CancellationToken cancellationToken = default)
imanvt marked this conversation as resolved.
Show resolved Hide resolved
{
this.httpClient = httpClient;
this.serviceEndpoint = serviceEndpoint;
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(AuthorizationTokenProvider));
this.connectionPolicy = connectionPolicy;
this.cancellationToken = cancellationToken;
}

private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoint)
Expand All @@ -52,7 +56,7 @@ await this.cosmosAuthorization.AddAuthorizationHeaderAsync(
resourceType: ResourceType.DatabaseAccount,
timeoutPolicy: HttpTimeoutPolicyControlPlaneRead.Instance,
clientSideRequestStatistics: stats,
cancellationToken: default))
cancellationToken: this.cancellationToken))
{
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
{
Expand Down Expand Up @@ -84,7 +88,7 @@ public async Task<AccountProperties> InitializeReaderAsync()
defaultEndpoint: this.serviceEndpoint,
locations: this.connectionPolicy.PreferredLocations,
getDatabaseAccountFn: this.GetDatabaseAccountAsync,
cancellationToken: default);
cancellationToken: this.cancellationToken);

return databaseAccount;
}
Expand Down
8 changes: 6 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationTokenSource cancellationTokenSource;
private readonly ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>> values;
private readonly Func<Exception, bool> removeFromCacheOnBackgroundRefreshException;

Expand All @@ -30,11 +30,15 @@ internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable

public AsyncCacheNonBlocking(
Func<Exception, bool> removeFromCacheOnBackgroundRefreshException = null,
IEqualityComparer<TKey> keyEqualityComparer = null)
IEqualityComparer<TKey> keyEqualityComparer = null,
CancellationToken cancellationToken = default)
{
this.keyEqualityComparer = keyEqualityComparer ?? EqualityComparer<TKey>.Default;
this.values = new ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>>(this.keyEqualityComparer);
this.removeFromCacheOnBackgroundRefreshException = removeFromCacheOnBackgroundRefreshException ?? AsyncCacheNonBlocking<TKey, TValue>.RemoveNotFoundFromCacheOnException;
this.cancellationTokenSource = cancellationToken == default
? new CancellationTokenSource()
: CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
}

public AsyncCacheNonBlocking()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using global::Azure.Core;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
Expand Down Expand Up @@ -230,5 +232,52 @@ public void Builder_ValidateHttpFactory()
WebProxy = null,
};
}

[TestMethod]
public void CosmosClientEarlyDisposeTest()
{
string disposeErrorMsg = "Cannot access a disposed object";
HashSet<string> errors = new HashSet<string>();

void TraceHandler(string message)
{
if (message.Contains(disposeErrorMsg))
{
errors.Add(message);
}
}

DefaultTrace.TraceSource.Listeners.Add(new TestTraceListener { Callback = TraceHandler });
DefaultTrace.InitEventListener();

for (int z = 0; z < 100; ++z)
{
using CosmosClient cosmos = new(ConnectionString);
}

string assertMsg = "";
imanvt marked this conversation as resolved.
Show resolved Hide resolved

foreach (string s in errors)
{
assertMsg += s + "\n";
imanvt marked this conversation as resolved.
Show resolved Hide resolved
}

Assert.AreEqual(0, errors.Count, $"\nErrors found in trace:\n{assertMsg}");
}

private class TestTraceListener : TraceListener
{
public Action<string> Callback { get; set; }
public override bool IsThreadSafe => true;
public override void Write(string message)
{
this.Callback(message);
}

public override void WriteLine(string message)
{
this.Callback(message);
}
}
}
}