Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DocumentClient: Fixes ObjectDisposedException during Background Refresh by adding Cancellation Token #3278

Merged
merged 9 commits into from
Jun 21, 2022
16 changes: 14 additions & 2 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,13 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider
//SessionContainer.
internal ISessionContainer sessionContainer;

private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

private AsyncLazy<QueryPartitionProvider> queryPartitionProvider;

private DocumentClientEventSource eventSource;
private Func<bool, Task<bool>> initializeTaskFactory;
internal AsyncCacheNonBlocking<string, bool> initTaskCache = new AsyncCacheNonBlocking<string, bool>();
internal AsyncCacheNonBlocking<string, bool> initTaskCache;

private JsonSerializerSettings serializerSettings;
private event EventHandler<SendingRequestEventArgs> sendingRequest;
Expand Down Expand Up @@ -218,6 +220,8 @@ public DocumentClient(Uri serviceEndpoint,
}

this.Initialize(serviceEndpoint, connectionPolicy, desiredConsistencyLevel);
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);

}

/// <summary>
Expand Down Expand Up @@ -459,6 +463,7 @@ internal DocumentClient(Uri serviceEndpoint,
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(cosmosAuthorization));
this.transportClientHandlerFactory = transportClientHandlerFactory;
this.IsLocalQuorumConsistency = isLocalQuorumConsistency;
this.initTaskCache = new AsyncCacheNonBlocking<string, bool>(cancellationToken: this.cancellationTokenSource.Token);

this.Initialize(
serviceEndpoint: serviceEndpoint,
Expand Down Expand Up @@ -1200,6 +1205,12 @@ public void Dispose()
return;
}

if (!this.cancellationTokenSource.IsCancellationRequested)
{
this.cancellationTokenSource.Cancel();
imanvt marked this conversation as resolved.
Show resolved Hide resolved
this.cancellationTokenSource.Dispose();
}

if (this.StoreModel != null)
{
this.StoreModel.Dispose();
Expand Down Expand Up @@ -6596,7 +6607,8 @@ private async Task InitializeGatewayConfigurationReaderAsync()
serviceEndpoint: this.ServiceEndpoint,
cosmosAuthorization: this.cosmosAuthorization,
connectionPolicy: this.ConnectionPolicy,
httpClient: this.httpClient);
httpClient: this.httpClient,
cancellationToken: this.cancellationTokenSource.Token);

this.accountServiceConfiguration = new CosmosAccountServiceConfiguration(accountReader.InitializeReaderAsync);

Expand Down
14 changes: 10 additions & 4 deletions Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Azure.Cosmos
using System;
using System.Globalization;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Cosmos.Routing;
Expand All @@ -21,17 +22,20 @@ internal sealed class GatewayAccountReader
private readonly AuthorizationTokenProvider cosmosAuthorization;
private readonly CosmosHttpClient httpClient;
private readonly Uri serviceEndpoint;
private readonly CancellationToken cancellationToken;

// Backlog: Auth abstractions are spilling through. 4 arguments for this CTOR are result of it.
public GatewayAccountReader(Uri serviceEndpoint,
AuthorizationTokenProvider cosmosAuthorization,
ConnectionPolicy connectionPolicy,
CosmosHttpClient httpClient)
CosmosHttpClient httpClient,
CancellationToken cancellationToken = default)
imanvt marked this conversation as resolved.
Show resolved Hide resolved
{
this.httpClient = httpClient;
this.serviceEndpoint = serviceEndpoint;
this.cosmosAuthorization = cosmosAuthorization ?? throw new ArgumentNullException(nameof(AuthorizationTokenProvider));
this.connectionPolicy = connectionPolicy;
this.cancellationToken = cancellationToken;
}

private async Task<AccountProperties> GetDatabaseAccountAsync(Uri serviceEndpoint)
Expand All @@ -52,7 +56,7 @@ await this.cosmosAuthorization.AddAuthorizationHeaderAsync(
resourceType: ResourceType.DatabaseAccount,
timeoutPolicy: HttpTimeoutPolicyControlPlaneRead.Instance,
clientSideRequestStatistics: stats,
cancellationToken: default))
cancellationToken: this.cancellationToken))
{
using (DocumentServiceResponse documentServiceResponse = await ClientExtensions.ParseResponseAsync(responseMessage))
{
Expand All @@ -61,13 +65,15 @@ await this.cosmosAuthorization.AddAuthorizationHeaderAsync(
}
}
catch (OperationCanceledException ex)
// this is the DocumentClient token, which only gets cancelled upon disposal
when (!this.cancellationToken.IsCancellationRequested)
{
// Catch Operation Cancelled Exception and convert to Timeout 408 if the user did not cancel it.
using (ITrace trace = Trace.GetRootTrace("Account Read Exception", TraceComponent.Transport, TraceLevel.Info))
{
trace.AddDatum("Client Side Request Stats", stats);
throw CosmosExceptionFactory.CreateRequestTimeoutException(
message: ex.Data?["Message"].ToString(),
message: ex.Data?["Message"]?.ToString() ?? ex.Message,
headers: new Headers()
{
ActivityId = System.Diagnostics.Trace.CorrelationManager.ActivityId.ToString()
Expand All @@ -84,7 +90,7 @@ public async Task<AccountProperties> InitializeReaderAsync()
defaultEndpoint: this.serviceEndpoint,
locations: this.connectionPolicy.PreferredLocations,
getDatabaseAccountFn: this.GetDatabaseAccountAsync,
cancellationToken: default);
cancellationToken: this.cancellationToken);

return databaseAccount;
}
Expand Down
8 changes: 6 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationTokenSource cancellationTokenSource;
private readonly ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>> values;
private readonly Func<Exception, bool> removeFromCacheOnBackgroundRefreshException;

Expand All @@ -30,11 +30,15 @@ internal sealed class AsyncCacheNonBlocking<TKey, TValue> : IDisposable

public AsyncCacheNonBlocking(
Func<Exception, bool> removeFromCacheOnBackgroundRefreshException = null,
IEqualityComparer<TKey> keyEqualityComparer = null)
IEqualityComparer<TKey> keyEqualityComparer = null,
CancellationToken cancellationToken = default)
{
this.keyEqualityComparer = keyEqualityComparer ?? EqualityComparer<TKey>.Default;
this.values = new ConcurrentDictionary<TKey, AsyncLazyWithRefreshTask<TValue>>(this.keyEqualityComparer);
this.removeFromCacheOnBackgroundRefreshException = removeFromCacheOnBackgroundRefreshException ?? AsyncCacheNonBlocking<TKey, TValue>.RemoveNotFoundFromCacheOnException;
this.cancellationTokenSource = cancellationToken == default
? new CancellationTokenSource()
: CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
}

public AsyncCacheNonBlocking()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ namespace Microsoft.Azure.Cosmos.Tests
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using global::Azure.Core;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Fluent;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
Expand Down Expand Up @@ -230,5 +232,55 @@ public void Builder_ValidateHttpFactory()
WebProxy = null,
};
}

[TestMethod]
public void CosmosClientEarlyDisposeTest()
{
string disposeErrorMsg = "Cannot access a disposed object";
HashSet<string> errors = new HashSet<string>();

void TraceHandler(string message)
{
if (message.Contains(disposeErrorMsg))
{
errors.Add(message);
}
}

DefaultTrace.TraceSource.Listeners.Add(new TestTraceListener { Callback = TraceHandler });
DefaultTrace.InitEventListener();

for (int z = 0; z < 100; ++z)
{
using CosmosClient cosmos = new(ConnectionString, new CosmosClientOptions
{
EnableClientTelemetry = true
});
}

string assertMsg = String.Empty;

foreach (string s in errors)
{
assertMsg += s + Environment.NewLine;
}

Assert.AreEqual(0, errors.Count, $"{Environment.NewLine}Errors found in trace:{Environment.NewLine}{assertMsg}");
}

private class TestTraceListener : TraceListener
{
public Action<string> Callback { get; set; }
public override bool IsThreadSafe => true;
public override void Write(string message)
{
this.Callback(message);
}

public override void WriteLine(string message)
{
this.Callback(message);
}
}
}
}