From d1ca1da54aba255149ff85620ffde82428711783 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Thu, 10 Feb 2022 15:58:06 -0800 Subject: [PATCH 1/8] Fix task intialization. --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 84 ++++++----- .../ClientTests.cs | 133 ++++++++++++++++++ .../Utils/HttpHandlerHelper.cs | 10 +- .../CosmosClientTests.cs | 2 + 4 files changed, 189 insertions(+), 40 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index c9bc0b348f..c2c93cab4d 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -167,6 +167,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private DocumentClientEventSource eventSource; internal Task initializeTask; + internal Func initializeTaskFactory; private JsonSerializerSettings serializerSettings; private event EventHandler sendingRequest; @@ -918,24 +919,24 @@ internal virtual void Initialize(Uri serviceEndpoint, this.eventSource = DocumentClientEventSource.Instance; - this.initializeTask = TaskHelper.InlineIfPossibleAsync( - () => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory), - new ResourceThrottleRetryPolicy( - this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests, - this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds)); - - // ContinueWith on the initialization task is needed for handling the UnobservedTaskException - // if this task throws for some reason. Awaiting inside a constructor is not supported and - // even if we had to await inside GetInitializationTask to catch the exception, that will - // be a blocking call. In such cases, the recommended approach is to "handle" the - // UnobservedTaskException by using ContinueWith method w/ TaskContinuationOptions.OnlyOnFaulted - // and accessing the Exception property on the target task. + this.initializeTaskFactory = () => + { + Task task = TaskHelper.InlineIfPossibleAsync(() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory), + new ResourceThrottleRetryPolicy( + this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests, + this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds)); + + // ContinueWith on the initialization task is needed for handling the UnobservedTaskException + // if this task throws for some reason. Awaiting inside a constructor is not supported and + // even if we had to await inside GetInitializationTask to catch the exception, that will + // be a blocking call. In such cases, the recommended approach is to "handle" the + // UnobservedTaskException by using ContinueWith method w/ TaskContinuationOptions.OnlyOnFaulted + // and accessing the Exception property on the target task. #pragma warning disable VSTHRD110 // Observe result of async calls - this.initializeTask.ContinueWith(t => + task.ContinueWith(t => DefaultTrace.TraceWarning("initializeTask failed {0}", t.Exception), TaskContinuationOptions.OnlyOnFaulted); #pragma warning restore VSTHRD110 // Observe result of async calls - { - DefaultTrace.TraceWarning("initializeTask failed {0}", t.Exception); - }, TaskContinuationOptions.OnlyOnFaulted); + return task; + }; this.traceId = Interlocked.Increment(ref DocumentClient.idCounter); DefaultTrace.TraceInformation(string.Format( @@ -1431,15 +1432,9 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) // client which is unusable and can resume working if it failed initialization once. // If we have to reinitialize the client, it needs to happen in thread safe manner so that // we dont re-initalize the task again for each incoming call. - Task initTask = null; - - lock (this.initializationSyncLock) - { - initTask = this.initializeTask; - } - try { + Task initTask = this.GetOrCreateInitializationTaskAsync(); await initTask; this.isSuccessfullyInitialized = true; return; @@ -1452,23 +1447,13 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) } catch (Exception e) { - DefaultTrace.TraceWarning("initializeTask failed {0}", e.ToString()); - childTrace.AddDatum("initializeTask failed", e.ToString()); - } - - lock (this.initializationSyncLock) - { - // if the task has not been updated by another caller, update it - if (object.ReferenceEquals(this.initializeTask, initTask)) - { - this.initializeTask = this.GetInitializationTaskAsync(storeClientFactory: null); - } - - initTask = this.initializeTask; + DefaultTrace.TraceWarning("initializeTask failed {0}", e); + childTrace.AddDatum("initializeTask failed", e); } try { + Task initTask = this.GetOrCreateInitializationTaskAsync(); await initTask; this.isSuccessfullyInitialized = true; } @@ -1478,7 +1463,6 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) dce: ex, trace: trace); } - } } @@ -6340,6 +6324,32 @@ Task IDocumentClientInternal.GetDatabaseAccountInternalAsync( return this.GetDatabaseAccountPrivateAsync(serviceEndpoint, cancellationToken); } + private Task GetOrCreateInitializationTaskAsync() + { + Task task = this.initializeTask; + if (task != null && !task.IsCanceled && !task.IsFaulted) + { + return task; + } + + lock (this.initializationSyncLock) + { + if (this.initializeTask == null) + { + this.initializeTask = this.initializeTaskFactory(); + return this.initializeTask; + } + + if (!this.initializeTask.IsFaulted && !this.initializeTask.IsCanceled) + { + return this.initializeTask; + } + + this.initializeTask = this.initializeTaskFactory(); + return this.initializeTask; + } + } + private async Task GetDatabaseAccountPrivateAsync(Uri serviceEndpoint, CancellationToken cancellationToken = default) { await this.EnsureValidClientAsync(NoOpTrace.Singleton); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs index 41a28a656a..ed92c84f00 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs @@ -7,12 +7,15 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System; using System.Collections.Generic; using System.Diagnostics; + using System.Formats.Asn1; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Net.NetworkInformation; using System.Reflection; + using System.Text; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Services.Management.Tests.LinqProviderTests; @@ -28,6 +31,136 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests public class ClientTests { [TestMethod] + public async Task ValidateExceptionOnInitTask() + { + int httpCallCount = 0; + HttpClientHandlerHelper httpClientHandlerHelper = new HttpClientHandlerHelper() + { + RequestCallBack = (request, cancellToken) => + { + httpCallCount++; + return null; + } + }; + + using CosmosClient cosmosClient = new CosmosClient( + accountEndpoint: "https://localhost:8081", + authKeyOrResourceToken: Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), + clientOptions: new CosmosClientOptions() + { + HttpClientFactory = () => new HttpClient(httpClientHandlerHelper), + }); + + CosmosException cosmosException1 = null; + try + { + await cosmosClient.GetContainer("db", "c").ReadItemAsync("Random", new Cosmos.PartitionKey("DoesNotExist")); + } + catch (CosmosException ex) + { + cosmosException1 = ex; + Assert.IsTrue(httpCallCount > 0); + } + + httpCallCount = 0; + try + { + await cosmosClient.GetContainer("db", "c").ReadItemAsync("Random2", new Cosmos.PartitionKey("DoesNotExist2")); + } + catch (CosmosException ex) + { + Assert.IsFalse(object.ReferenceEquals(ex, cosmosException1)); + Assert.IsTrue(httpCallCount > 0); + } + } + + [TestMethod] + public async Task InitTaskThreadSafe() + { + int httpCallCount = 0; + bool delayCallBack = true; + HttpClientHandlerHelper httpClientHandlerHelper = new HttpClientHandlerHelper() + { + RequestCallBack = async (request, cancellToken) => + { + httpCallCount++; + while (delayCallBack) + { + await Task.Delay(TimeSpan.FromMilliseconds(100)); + } + + return null; + } + }; + + using CosmosClient cosmosClient = new CosmosClient( + accountEndpoint: "https://localhost:8081", + authKeyOrResourceToken: Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), + clientOptions: new CosmosClientOptions() + { + HttpClientFactory = () => new HttpClient(httpClientHandlerHelper), + }); + + List tasks = new List(); + + Container container = cosmosClient.GetContainer("db", "c"); + for (int i = 0; i < 10; i++) + { + tasks.Add(this.ReadNotFound(container)); + } + + Stopwatch sw = Stopwatch.StartNew(); + while(this.TaskStartedCount < 10 && sw.Elapsed.TotalSeconds < 2) + { + await Task.Delay(TimeSpan.FromMilliseconds(50)); + } + + Assert.AreEqual(10, this.TaskStartedCount, "Tasks did not start"); + delayCallBack = false; + + await Task.WhenAll(tasks); + + Assert.AreEqual(1, httpCallCount, "Only the first task should do the http call. All other should wait on the first task"); + + // Reset counters and retry the client to verify a new http call is done for new requests + tasks.Clear(); + delayCallBack = true; + this.TaskStartedCount = 0; + for (int i = 0; i < 10; i++) + { + tasks.Add(this.ReadNotFound(container)); + } + + sw = Stopwatch.StartNew(); + while (this.TaskStartedCount < 10 && sw.Elapsed.TotalSeconds < 2) + { + await Task.Delay(TimeSpan.FromMilliseconds(50)); + } + + Assert.AreEqual(10, this.TaskStartedCount, "Tasks did not start"); + delayCallBack = false; + + await Task.WhenAll(tasks); + + Assert.AreEqual(2, httpCallCount, "Should recreate the task after the first failure"); + } + + private int TaskStartedCount = 0; + + private async Task ReadNotFound(Container container) + { + try + { + Interlocked.Increment(ref this.TaskStartedCount); + await container.ReadItemAsync("Random", new Cosmos.PartitionKey("DoesNotExist")); + throw new Exception("Should throw a CosmosException 403"); + } + catch (CosmosException ex) + { + return ex; + } + } + public async Task ResourceResponseStreamingTest() { using (DocumentClient client = TestCommon.CreateClient(true)) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/HttpHandlerHelper.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/HttpHandlerHelper.cs index 86e593eae9..bd84a7ca44 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/HttpHandlerHelper.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/HttpHandlerHelper.cs @@ -17,18 +17,22 @@ public HttpClientHandlerHelper() : base(new HttpClientHandler()) public Func> RequestCallBack { get; set; } - protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { if(this.RequestCallBack != null) { Task response = this.RequestCallBack(request, cancellationToken); if(response != null) { - return response; + HttpResponseMessage httpResponse = await response; + if(httpResponse != null) + { + return httpResponse; + } } } - return base.SendAsync(request, cancellationToken); + return await base.SendAsync(request, cancellationToken); } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs index fef91d037a..6127225d4f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs @@ -10,12 +10,14 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Linq; using System.Net; using System.Net.Http; + using System.Text; using System.Threading; using System.Threading.Tasks; using global::Azure.Core; using Microsoft.Azure.Cosmos.Fluent; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; + using Newtonsoft.Json.Linq; [TestClass] public class CosmosClientTests From 604bf876be51e644329f8e24ffaaaaa7cba17a73 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Fri, 11 Feb 2022 06:00:08 -0800 Subject: [PATCH 2/8] Test fixes --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 7 ++- .../ClientTests.cs | 56 ++++++++----------- 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index c2c93cab4d..f5cbdffa69 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -166,8 +166,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private AsyncLazy queryPartitionProvider; private DocumentClientEventSource eventSource; + private Func initializeTaskFactory; internal Task initializeTask; - internal Func initializeTaskFactory; private JsonSerializerSettings serializerSettings; private event EventHandler sendingRequest; @@ -938,6 +938,11 @@ internal virtual void Initialize(Uri serviceEndpoint, return task; }; + lock (this.initializationSyncLock) + { + this.initializeTask = this.initializeTaskFactory(); + } + this.traceId = Interlocked.Increment(ref DocumentClient.idCounter); DefaultTrace.TraceInformation(string.Format( CultureInfo.InvariantCulture, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs index ed92c84f00..636e587a7c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientTests.cs @@ -38,7 +38,7 @@ public async Task ValidateExceptionOnInitTask() { RequestCallBack = (request, cancellToken) => { - httpCallCount++; + Interlocked.Increment(ref httpCallCount); return null; } }; @@ -83,7 +83,7 @@ public async Task InitTaskThreadSafe() { RequestCallBack = async (request, cancellToken) => { - httpCallCount++; + Interlocked.Increment(ref httpCallCount); while (delayCallBack) { await Task.Delay(TimeSpan.FromMilliseconds(100)); @@ -104,45 +104,33 @@ public async Task InitTaskThreadSafe() List tasks = new List(); Container container = cosmosClient.GetContainer("db", "c"); - for (int i = 0; i < 10; i++) + + for(int loop = 0; loop < 3; loop++) { - tasks.Add(this.ReadNotFound(container)); - } + for (int i = 0; i < 10; i++) + { + tasks.Add(this.ReadNotFound(container)); + } - Stopwatch sw = Stopwatch.StartNew(); - while(this.TaskStartedCount < 10 && sw.Elapsed.TotalSeconds < 2) - { - await Task.Delay(TimeSpan.FromMilliseconds(50)); - } - - Assert.AreEqual(10, this.TaskStartedCount, "Tasks did not start"); - delayCallBack = false; + Stopwatch sw = Stopwatch.StartNew(); + while(this.TaskStartedCount < 10 && sw.Elapsed.TotalSeconds < 2) + { + await Task.Delay(TimeSpan.FromMilliseconds(50)); + } - await Task.WhenAll(tasks); + Assert.AreEqual(10, this.TaskStartedCount, "Tasks did not start"); + delayCallBack = false; - Assert.AreEqual(1, httpCallCount, "Only the first task should do the http call. All other should wait on the first task"); + await Task.WhenAll(tasks); - // Reset counters and retry the client to verify a new http call is done for new requests - tasks.Clear(); - delayCallBack = true; - this.TaskStartedCount = 0; - for (int i = 0; i < 10; i++) - { - tasks.Add(this.ReadNotFound(container)); - } + Assert.AreEqual(1, httpCallCount, "Only the first task should do the http call. All other should wait on the first task"); - sw = Stopwatch.StartNew(); - while (this.TaskStartedCount < 10 && sw.Elapsed.TotalSeconds < 2) - { - await Task.Delay(TimeSpan.FromMilliseconds(50)); + // Reset counters and retry the client to verify a new http call is done for new requests + tasks.Clear(); + delayCallBack = true; + this.TaskStartedCount = 0; + httpCallCount = 0; } - - Assert.AreEqual(10, this.TaskStartedCount, "Tasks did not start"); - delayCallBack = false; - - await Task.WhenAll(tasks); - - Assert.AreEqual(2, httpCallCount, "Should recreate the task after the first failure"); } private int TaskStartedCount = 0; From 269bc46d70559ee795cf73c5364146587adcc81e Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Fri, 11 Feb 2022 10:02:50 -0800 Subject: [PATCH 3/8] Move to AsyncCacheNonBlocking --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 90 ++++++++----------- .../DocumentClientExtensions.cs | 7 +- 2 files changed, 42 insertions(+), 55 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index f5cbdffa69..28de886bf1 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -166,8 +166,8 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private AsyncLazy queryPartitionProvider; private DocumentClientEventSource eventSource; - private Func initializeTaskFactory; - internal Task initializeTask; + private Func> initializeTaskFactory; + internal AsyncCacheNonBlocking initTaskCache = new AsyncCacheNonBlocking(); private JsonSerializerSettings serializerSettings; private event EventHandler sendingRequest; @@ -921,7 +921,8 @@ internal virtual void Initialize(Uri serviceEndpoint, this.initializeTaskFactory = () => { - Task task = TaskHelper.InlineIfPossibleAsync(() => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory), + Task task = TaskHelper.InlineIfPossible( + () => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory), new ResourceThrottleRetryPolicy( this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests, this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds)); @@ -938,11 +939,6 @@ internal virtual void Initialize(Uri serviceEndpoint, return task; }; - lock (this.initializationSyncLock) - { - this.initializeTask = this.initializeTaskFactory(); - } - this.traceId = Interlocked.Increment(ref DocumentClient.idCounter); DefaultTrace.TraceInformation(string.Format( CultureInfo.InvariantCulture, @@ -957,7 +953,7 @@ internal virtual void Initialize(Uri serviceEndpoint, } // Always called from under the lock except when called from Intilialize method during construction. - private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFactory) + private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFactory) { await this.InitializeGatewayConfigurationReaderAsync(); @@ -990,6 +986,8 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFac { this.InitializeDirectConnectivity(storeClientFactory); } + + return true; } private async Task InitializeCachesAsync(string databaseName, DocumentCollection collection, CancellationToken cancellationToken) @@ -1439,10 +1437,11 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) // we dont re-initalize the task again for each incoming call. try { - Task initTask = this.GetOrCreateInitializationTaskAsync(); - await initTask; - this.isSuccessfullyInitialized = true; - return; + this.isSuccessfullyInitialized = await this.initTaskCache.GetAsync( + key: "InitTask", + singleValueInitFunc: this.initializeTaskFactory, + forceRefresh: false, + callBackOnForceRefresh: null); } catch (DocumentClientException ex) { @@ -1455,19 +1454,6 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) DefaultTrace.TraceWarning("initializeTask failed {0}", e); childTrace.AddDatum("initializeTask failed", e); } - - try - { - Task initTask = this.GetOrCreateInitializationTaskAsync(); - await initTask; - this.isSuccessfullyInitialized = true; - } - catch (DocumentClientException ex) - { - throw Resource.CosmosExceptions.CosmosExceptionFactory.Create( - dce: ex, - trace: trace); - } } } @@ -6329,31 +6315,31 @@ Task IDocumentClientInternal.GetDatabaseAccountInternalAsync( return this.GetDatabaseAccountPrivateAsync(serviceEndpoint, cancellationToken); } - private Task GetOrCreateInitializationTaskAsync() - { - Task task = this.initializeTask; - if (task != null && !task.IsCanceled && !task.IsFaulted) - { - return task; - } - - lock (this.initializationSyncLock) - { - if (this.initializeTask == null) - { - this.initializeTask = this.initializeTaskFactory(); - return this.initializeTask; - } - - if (!this.initializeTask.IsFaulted && !this.initializeTask.IsCanceled) - { - return this.initializeTask; - } - - this.initializeTask = this.initializeTaskFactory(); - return this.initializeTask; - } - } + //private Task GetOrCreateInitializationTaskAsync() + //{ + // Task task = this.initializeTask; + // if (task != null && !task.IsCanceled && !task.IsFaulted) + // { + // return task; + // } + + // lock (this.initializationSyncLock) + // { + // if (this.initializeTask == null) + // { + // this.initializeTask = this.initializeTaskFactory(); + // return this.initializeTask; + // } + + // if (!this.initializeTask.IsFaulted && !this.initializeTask.IsCanceled) + // { + // return this.initializeTask; + // } + + // this.initializeTask = this.initializeTaskFactory(); + // return this.initializeTask; + // } + //} private async Task GetDatabaseAccountPrivateAsync(Uri serviceEndpoint, CancellationToken cancellationToken = default) { @@ -6737,7 +6723,7 @@ private JsonSerializerSettings GetSerializerSettingsForRequest(Documents.Client. private INameValueCollection GetRequestHeaders(Documents.Client.RequestOptions options) { Debug.Assert( - this.initializeTask.IsCompleted, + this.isSuccessfullyInitialized, "GetRequestHeaders should be called after initialization task has been awaited to avoid blocking while accessing ConsistencyLevel property"); INameValueCollection headers = new StoreRequestNameValueCollection(); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientExtensions.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientExtensions.cs index f385b9275e..aef70c7160 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientExtensions.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/DocumentClientExtensions.cs @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.IO; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; //Internal Test hooks. @@ -21,7 +22,7 @@ internal static class DocumentClientExtensions //This will lock the client instance to a particular replica Index. public static void LockClient(this DocumentClient client, uint replicaIndex) { - client.initializeTask.Wait(); + client.EnsureValidClientAsync(NoOpTrace.Singleton).Wait(); ServerStoreModel serverStoreModel = (client.StoreModel as ServerStoreModel); if (serverStoreModel != null) { @@ -31,7 +32,7 @@ public static void LockClient(this DocumentClient client, uint replicaIndex) public static void ForceAddressRefresh(this DocumentClient client, bool forceAddressRefresh) { - client.initializeTask.Wait(); + client.EnsureValidClientAsync(NoOpTrace.Singleton).Wait(); ServerStoreModel serverStoreModel = (client.StoreModel as ServerStoreModel); if (serverStoreModel != null) { @@ -42,7 +43,7 @@ public static void ForceAddressRefresh(this DocumentClient client, bool forceAdd //Returns the address of replica. public static string GetAddress(this DocumentClient client) { - client.initializeTask.Wait(); + client.EnsureValidClientAsync(NoOpTrace.Singleton).Wait(); return (client.StoreModel as ServerStoreModel).LastReadAddress; } } From 938355b459e61e6292caf1d2b5bc29e045342009 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Fri, 11 Feb 2022 12:51:18 -0800 Subject: [PATCH 4/8] Fixed logic to throw a CosmosException --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 44 ++++++++------------ 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 28de886bf1..f4458af141 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -1435,10 +1435,11 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) // client which is unusable and can resume working if it failed initialization once. // If we have to reinitialize the client, it needs to happen in thread safe manner so that // we dont re-initalize the task again for each incoming call. + const string key = "InitTask"; try { this.isSuccessfullyInitialized = await this.initTaskCache.GetAsync( - key: "InitTask", + key: key, singleValueInitFunc: this.initializeTaskFactory, forceRefresh: false, callBackOnForceRefresh: null); @@ -1454,6 +1455,21 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) DefaultTrace.TraceWarning("initializeTask failed {0}", e); childTrace.AddDatum("initializeTask failed", e); } + + try + { + this.isSuccessfullyInitialized = await this.initTaskCache.GetAsync( + key: key, + singleValueInitFunc: this.initializeTaskFactory, + forceRefresh: false, + callBackOnForceRefresh: null); + } + catch (DocumentClientException ex) + { + throw Resource.CosmosExceptions.CosmosExceptionFactory.Create( + dce: ex, + trace: trace); + } } } @@ -6315,32 +6331,6 @@ Task IDocumentClientInternal.GetDatabaseAccountInternalAsync( return this.GetDatabaseAccountPrivateAsync(serviceEndpoint, cancellationToken); } - //private Task GetOrCreateInitializationTaskAsync() - //{ - // Task task = this.initializeTask; - // if (task != null && !task.IsCanceled && !task.IsFaulted) - // { - // return task; - // } - - // lock (this.initializationSyncLock) - // { - // if (this.initializeTask == null) - // { - // this.initializeTask = this.initializeTaskFactory(); - // return this.initializeTask; - // } - - // if (!this.initializeTask.IsFaulted && !this.initializeTask.IsCanceled) - // { - // return this.initializeTask; - // } - - // this.initializeTask = this.initializeTaskFactory(); - // return this.initializeTask; - // } - //} - private async Task GetDatabaseAccountPrivateAsync(Uri serviceEndpoint, CancellationToken cancellationToken = default) { await this.EnsureValidClientAsync(NoOpTrace.Singleton); From 79d93537c98bcfb639d6bc22f4d7622542969b0b Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Fri, 11 Feb 2022 12:57:29 -0800 Subject: [PATCH 5/8] Remove unused variable --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 5 ----- .../CosmosClientTests.cs | 12 +++++------- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index f4458af141..d3b36fb75b 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -12,8 +12,6 @@ namespace Microsoft.Azure.Cosmos using System.Linq; using System.Net; using System.Net.Http; - using System.Net.Http.Headers; - using System.Runtime.CompilerServices; using System.Security; using System.Text; using System.Threading; @@ -144,7 +142,6 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider //Private state. private bool isSuccessfullyInitialized; private bool isDisposed; - private object initializationSyncLock; // guards initializeTask // creator of TransportClient is responsible for disposing it. private IStoreClientFactory storeClientFactory; @@ -915,8 +912,6 @@ internal virtual void Initialize(Uri serviceEndpoint, // Setup the proxy to be used based on connection mode. // For gateway: GatewayProxy. // For direct: WFStoreProxy [set in OpenAsync()]. - this.initializationSyncLock = new object(); - this.eventSource = DocumentClientEventSource.Instance; this.initializeTaskFactory = () => diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs index 6127225d4f..f24e7981ef 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosClientTests.cs @@ -10,14 +10,12 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Linq; using System.Net; using System.Net.Http; - using System.Text; using System.Threading; using System.Threading.Tasks; using global::Azure.Core; using Microsoft.Azure.Cosmos.Fluent; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; - using Newtonsoft.Json.Linq; [TestClass] public class CosmosClientTests @@ -65,8 +63,8 @@ public async Task TestDispose() await asyncFunc(); Assert.Fail("Should throw ObjectDisposedException"); } - catch (CosmosObjectDisposedException e) - { + catch (CosmosObjectDisposedException e) + { string expectedMessage = $"Cannot access a disposed 'CosmosClient'. Follow best practices and use the CosmosClient as a singleton." + $" CosmosClient was disposed at: {cosmosClient.DisposedDateTimeUtc.Value.ToString("o", CultureInfo.InvariantCulture)}; CosmosClient Endpoint: https://localtestcosmos.documents.azure.com/; Created at: {cosmosClient.ClientConfigurationTraceDatum.ClientCreatedDateTimeUtc.ToString("o", CultureInfo.InvariantCulture)}; UserAgent: {userAgent};"; Assert.IsTrue(e.Message.Contains(expectedMessage)); @@ -162,7 +160,7 @@ public async Task ValidateAuthorizationTokenProviderTestAsync() Exception exceptionToThrow = new Exception("TestException"); Mock mockHttpHandler = new Mock(); mockHttpHandler.Setup(x => x.SendAsync( - It.IsAny(), + It.IsAny(), It.IsAny())) .Callback( (request, cancellationToken) => @@ -187,9 +185,9 @@ public async Task ValidateAuthorizationTokenProviderTestAsync() await container.ReadItemAsync(Guid.NewGuid().ToString(), new PartitionKey(Guid.NewGuid().ToString())); } catch (Exception e) when (object.ReferenceEquals(e, exceptionToThrow)) - { + { } - + Assert.IsTrue(validAuth); } From 910ddd2dff4877764326799bf3d3a14ea302dbd7 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Fri, 11 Feb 2022 14:11:17 -0800 Subject: [PATCH 6/8] Dispose the cache --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index d3b36fb75b..c3b82c1698 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -1242,6 +1242,11 @@ public void Dispose() this.queryPartitionProvider.Value.Dispose(); } + if (this.initTaskCache != null) + { + this.initTaskCache.Dispose(); + } + DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId); DefaultTrace.Flush(); From 26b9b084f2656de30473613ffaa5f7d095a3a95b Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Fri, 11 Feb 2022 15:37:25 -0800 Subject: [PATCH 7/8] Create task to avoid waiting until EnsureValidClientAsync is called --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 28 ++++++++------------ 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index c3b82c1698..135e499836 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -107,6 +107,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private const int DefaultRntbdReceiveHangDetectionTimeSeconds = 65; private const int DefaultRntbdSendHangDetectionTimeSeconds = 10; private const bool DefaultEnableCpuMonitor = true; + private const string DefaultInitTaskKey = "InitTaskKey"; //Auth private readonly AuthorizationTokenProvider cosmosAuthorization; @@ -934,6 +935,14 @@ internal virtual void Initialize(Uri serviceEndpoint, return task; }; + // Create the task to start the initialize task + // Task will be awaited on in the EnsureValidClientAsync + Task t = this.initTaskCache.GetAsync( + key: DocumentClient.DefaultInitTaskKey, + singleValueInitFunc: this.initializeTaskFactory, + forceRefresh: false, + callBackOnForceRefresh: null); + this.traceId = Interlocked.Increment(ref DocumentClient.idCounter); DefaultTrace.TraceInformation(string.Format( CultureInfo.InvariantCulture, @@ -1435,11 +1444,10 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) // client which is unusable and can resume working if it failed initialization once. // If we have to reinitialize the client, it needs to happen in thread safe manner so that // we dont re-initalize the task again for each incoming call. - const string key = "InitTask"; try { this.isSuccessfullyInitialized = await this.initTaskCache.GetAsync( - key: key, + key: DocumentClient.DefaultInitTaskKey, singleValueInitFunc: this.initializeTaskFactory, forceRefresh: false, callBackOnForceRefresh: null); @@ -1454,21 +1462,7 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) { DefaultTrace.TraceWarning("initializeTask failed {0}", e); childTrace.AddDatum("initializeTask failed", e); - } - - try - { - this.isSuccessfullyInitialized = await this.initTaskCache.GetAsync( - key: key, - singleValueInitFunc: this.initializeTaskFactory, - forceRefresh: false, - callBackOnForceRefresh: null); - } - catch (DocumentClientException ex) - { - throw Resource.CosmosExceptions.CosmosExceptionFactory.Create( - dce: ex, - trace: trace); + throw; } } } From 64681b73353ab8550fc8c5c982af6e5f0efee200 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Sat, 12 Feb 2022 07:41:19 -0800 Subject: [PATCH 8/8] Update test since EnsureValidClient does not retry 2 times now. --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 1 + .../CosmosGatewayTimeoutTests.cs | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 135e499836..888478bec7 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -1254,6 +1254,7 @@ public void Dispose() if (this.initTaskCache != null) { this.initTaskCache.Dispose(); + this.initTaskCache = null; } DefaultTrace.TraceInformation("DocumentClient with id {0} disposed.", this.traceId); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosGatewayTimeoutTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosGatewayTimeoutTests.cs index a1174a4703..8b4728c122 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosGatewayTimeoutTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosGatewayTimeoutTests.cs @@ -106,7 +106,7 @@ public async Task CosmosHttpClientRetryValidation() } catch (CosmosException rte) { - Assert.IsTrue(handler.Count >= 6); + Assert.IsTrue(handler.Count >= 3, $"HandlerCount: {handler.Count}; Expecte 6"); string message = rte.ToString(); Assert.IsTrue(message.Contains("Start Time"), "Start Time:" + message); Assert.IsTrue(message.Contains("Total Duration"), "Total Duration:" + message); @@ -129,9 +129,10 @@ private class TransientHttpClientCreatorHandler : DelegatingHandler protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { - if (this.Count++ <= 3) + this.Count++; + if (this.Count < 3) { - throw new WebException(); + throw new WebException($"Mocked WebException {this.Count}"); } throw new TaskCanceledException();