From 2f2b68791417e22abfe715832391270835b05d34 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Tue, 15 Mar 2022 07:08:47 -0700 Subject: [PATCH 1/2] Remove call back --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 13 +- .../src/Routing/AsyncCacheNonBlocking.cs | 22 ++-- .../src/Routing/GatewayAddressCache.cs | 21 ++-- .../src/Routing/PartitionKeyRangeCache.cs | 10 +- .../Routing/AsyncCacheNonBlockingTests.cs | 113 +++++++----------- 5 files changed, 73 insertions(+), 106 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index a089d26d5d..0827bf4d9e 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -164,7 +164,7 @@ internal partial class DocumentClient : IDisposable, IAuthorizationTokenProvider private AsyncLazy queryPartitionProvider; private DocumentClientEventSource eventSource; - private Func> initializeTaskFactory; + private Func> initializeTaskFactory; internal AsyncCacheNonBlocking initTaskCache = new AsyncCacheNonBlocking(); private JsonSerializerSettings serializerSettings; @@ -915,22 +915,18 @@ internal virtual void Initialize(Uri serviceEndpoint, // For direct: WFStoreProxy [set in OpenAsync()]. this.eventSource = DocumentClientEventSource.Instance; - this.initializeTaskFactory = () => - { - return TaskHelper.InlineIfPossible( + this.initializeTaskFactory = (_) => TaskHelper.InlineIfPossible( () => this.GetInitializationTaskAsync(storeClientFactory: storeClientFactory), new ResourceThrottleRetryPolicy( this.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests, this.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds)); - }; // Create the task to start the initialize task // Task will be awaited on in the EnsureValidClientAsync Task initTask = this.initTaskCache.GetAsync( key: DocumentClient.DefaultInitTaskKey, singleValueInitFunc: this.initializeTaskFactory, - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + forceRefresh: (_) => false); // ContinueWith on the initialization task is needed for handling the UnobservedTaskException // if this task throws for some reason. Awaiting inside a constructor is not supported and @@ -1449,8 +1445,7 @@ internal virtual async Task EnsureValidClientAsync(ITrace trace) this.isSuccessfullyInitialized = await this.initTaskCache.GetAsync( key: DocumentClient.DefaultInitTaskKey, singleValueInitFunc: this.initializeTaskFactory, - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + forceRefresh: (_) => false); } catch (DocumentClientException ex) { diff --git a/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs b/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs index fa56aa20c3..11fae1697c 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/AsyncCacheNonBlocking.cs @@ -85,9 +85,8 @@ private static bool RemoveNotFoundFromCacheOnException(Exception e) /// public async Task GetAsync( TKey key, - Func> singleValueInitFunc, - Func forceRefresh, - Action callBackOnForceRefresh) + Func> singleValueInitFunc, + Func forceRefresh) { if (this.values.TryGetValue(key, out AsyncLazyWithRefreshTask initialLazyValue)) { @@ -120,8 +119,7 @@ public async Task GetAsync( try { return await initialLazyValue.CreateAndWaitForBackgroundRefreshTaskAsync( - createRefreshTask: singleValueInitFunc, - callBackOnForceRefresh: callBackOnForceRefresh); + createRefreshTask: singleValueInitFunc); } catch (Exception e) { @@ -202,7 +200,7 @@ public bool TryRemove(TKey key) private sealed class AsyncLazyWithRefreshTask { private readonly CancellationToken cancellationToken; - private readonly Func> createValueFunc; + private readonly Func> createValueFunc; private readonly object valueLock = new object(); private readonly object removedFromCacheLock = new object(); @@ -221,7 +219,7 @@ public AsyncLazyWithRefreshTask( } public AsyncLazyWithRefreshTask( - Func> taskFactory, + Func> taskFactory, CancellationToken cancellationToken) { this.cancellationToken = cancellationToken; @@ -252,14 +250,13 @@ public Task GetValueAsync() } this.cancellationToken.ThrowIfCancellationRequested(); - this.value = this.createValueFunc(); + this.value = this.createValueFunc(default); return this.value; } } public async Task CreateAndWaitForBackgroundRefreshTaskAsync( - Func> createRefreshTask, - Action callBackOnForceRefresh) + Func> createRefreshTask) { this.cancellationToken.ThrowIfCancellationRequested(); @@ -284,7 +281,6 @@ public async Task CreateAndWaitForBackgroundRefreshTaskAsync( if (AsyncLazyWithRefreshTask.IsTaskRunning(refresh)) { T result = await refresh; - callBackOnForceRefresh?.Invoke(originalValue, result); return result; } @@ -298,7 +294,7 @@ public async Task CreateAndWaitForBackgroundRefreshTaskAsync( else { createdTask = true; - this.refreshInProgress = createRefreshTask(); + this.refreshInProgress = createRefreshTask(originalValue); refresh = this.refreshInProgress; } } @@ -307,7 +303,6 @@ public async Task CreateAndWaitForBackgroundRefreshTaskAsync( if (!createdTask) { T result = await refresh; - callBackOnForceRefresh?.Invoke(originalValue, result); return result; } @@ -320,7 +315,6 @@ public async Task CreateAndWaitForBackgroundRefreshTaskAsync( this.value = Task.FromResult(itemResult); } - callBackOnForceRefresh?.Invoke(originalValue, itemResult); return itemResult; } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 3e0de11067..53d21ad16f 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -187,17 +187,22 @@ public async Task TryGetAddressesAsync( } PartitionAddressInformation addresses; + PartitionAddressInformation staleAddressInfo = null; if (forceRefreshPartitionAddresses || request.ForceCollectionRoutingMapRefresh) { addresses = await this.serverPartitionAddressCache.GetAsync( key: partitionKeyRangeIdentity, - singleValueInitFunc: () => this.GetAddressesForRangeIdAsync( + singleValueInitFunc: (staleAddresses) => this.GetAddressesForRangeIdAsync( request, partitionKeyRangeIdentity.CollectionRid, partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: forceRefreshPartitionAddresses), - forceRefresh: (_) => true, - callBackOnForceRefresh: (old, updated) => GatewayAddressCache.LogPartitionCacheRefresh(request.RequestContext.ClientRequestStatistics, old, updated)); + forceRefresh: (_) => true); + + if (staleAddressInfo != null) + { + GatewayAddressCache.LogPartitionCacheRefresh(request.RequestContext.ClientRequestStatistics, staleAddressInfo, addresses); + } this.suboptimalServerPartitionTimestamps.TryRemove(partitionKeyRangeIdentity, out DateTime ignoreDateTime); } @@ -205,13 +210,12 @@ public async Task TryGetAddressesAsync( { addresses = await this.serverPartitionAddressCache.GetAsync( key: partitionKeyRangeIdentity, - singleValueInitFunc: () => this.GetAddressesForRangeIdAsync( + singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( request, partitionKeyRangeIdentity.CollectionRid, partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: false), - forceRefresh: (_) => false, - callBackOnForceRefresh: (old, updated) => GatewayAddressCache.LogPartitionCacheRefresh(request.RequestContext.ClientRequestStatistics, old, updated)); + forceRefresh: (_) => false); } int targetReplicaSetSize = this.serviceConfigReader.UserReplicationPolicy.MaxReplicaSetSize; @@ -298,13 +302,12 @@ public async Task UpdateAsync( return await this.serverPartitionAddressCache.GetAsync( key: partitionKeyRangeIdentity, - singleValueInitFunc: () => this.GetAddressesForRangeIdAsync( + singleValueInitFunc: (_) => this.GetAddressesForRangeIdAsync( null, partitionKeyRangeIdentity.CollectionRid, partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: true), - forceRefresh: (_) => true, - callBackOnForceRefresh: null); + forceRefresh: (_) => true); } private async Task> ResolveMasterAsync(DocumentServiceRequest request, bool forceRefresh) diff --git a/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs b/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs index 7c28dd2d2a..a7d9b92bcc 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs @@ -120,13 +120,12 @@ public virtual async Task TryLookupAsync( { return await this.routingMapCache.GetAsync( key: collectionRid, - singleValueInitFunc: () => this.GetRoutingMapForCollectionAsync( + singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync( collectionRid, previousValue, trace, request?.RequestContext?.ClientRequestStatistics), - forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue), - callBackOnForceRefresh: null); + forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue)); } catch (DocumentClientException ex) { @@ -184,13 +183,12 @@ public async Task TryGetRangeByPartitionKeyRangeIdAsync(strin { CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync( key: collectionRid, - singleValueInitFunc: () => this.GetRoutingMapForCollectionAsync( + singleValueInitFunc: (_) => this.GetRoutingMapForCollectionAsync( collectionRid: collectionRid, previousRoutingMap: null, trace: trace, clientSideRequestStatistics: clientSideRequestStatistics), - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + forceRefresh: (_) => false); return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs index 2f3dcc100c..1b9796d019 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Routing/AsyncCacheNonBlockingTests.cs @@ -31,13 +31,12 @@ public async Task ValidateNegativeScenario(bool forceRefresh) AsyncCacheNonBlocking asyncCache = new AsyncCacheNonBlocking(); await asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); throw new NotFoundException("testNotFoundException"); }, - (_) => forceRefresh, - null); + (_) => forceRefresh); } [TestMethod] @@ -48,13 +47,12 @@ public async Task ValidateMultipleBackgroundRefreshesScenario() string expectedValue = "ResponseValue"; string response = await asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); return expectedValue; }, - (_) => false, - null); + (_) => false); Assert.AreEqual(expectedValue, response); @@ -62,13 +60,12 @@ public async Task ValidateMultipleBackgroundRefreshesScenario() { string forceRefreshResponse = await asyncCache.GetAsync( key: "test", - singleValueInitFunc: async () => + singleValueInitFunc: async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); return expectedValue + i; }, - forceRefresh: (_) => true, - callBackOnForceRefresh: null); + forceRefresh: (_) => true); Assert.AreEqual(expectedValue + i, forceRefreshResponse); } @@ -81,21 +78,19 @@ public async Task ValidateNegativeNotAwaitedScenario() AsyncCacheNonBlocking asyncCache = new AsyncCacheNonBlocking(); Task task1 = asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); throw new NotFoundException("testNotFoundException"); }, - (_) => false, - null); + (_) => false); try { await asyncCache.GetAsync( "test", - () => throw new BadRequestException("testBadRequestException"), - (_) => false, - null); + (_) => throw new BadRequestException("testBadRequestException"), + (_) => false); Assert.Fail("Should have thrown a NotFoundException"); } catch (NotFoundException) @@ -113,7 +108,7 @@ public async Task ValidateNotFoundOnBackgroundRefreshRemovesFromCacheScenario() AsyncCacheNonBlocking asyncCache = new AsyncCacheNonBlocking(); string response1 = await asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); return value1; @@ -122,14 +117,13 @@ public async Task ValidateNotFoundOnBackgroundRefreshRemovesFromCacheScenario() { Assert.AreEqual(null, staleValue); return false; - }, - null); + }); Assert.AreEqual(value1, response1); string response2 = await asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); throw new Exception("Should use cached value"); @@ -138,8 +132,7 @@ public async Task ValidateNotFoundOnBackgroundRefreshRemovesFromCacheScenario() { Assert.AreEqual(value1, staleValue); return false; - }, - null); + }); Assert.AreEqual(value1, response2); @@ -148,13 +141,12 @@ public async Task ValidateNotFoundOnBackgroundRefreshRemovesFromCacheScenario() { await asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); throw notFoundException; }, - (_) => true, - null); + (_) => true); Assert.Fail("Should have thrown a NotFoundException"); } catch (NotFoundException exception) @@ -165,13 +157,12 @@ await asyncCache.GetAsync( string valueAfterNotFound = "response4Value"; string response4 = await asyncCache.GetAsync( "test", - async () => + async (_) => { await Task.Delay(TimeSpan.FromMilliseconds(5)); return valueAfterNotFound; }, - (_) => false, - null); + (_) => false); Assert.AreEqual(valueAfterNotFound, response4); } @@ -183,34 +174,30 @@ public async Task ValidateAsyncCacheNonBlocking() AsyncCacheNonBlocking asyncCache = new AsyncCacheNonBlocking(); string result = await asyncCache.GetAsync( "test", - () => Task.FromResult("test2"), - (_) => false, - (x, y) => throw new Exception("Should not be called since there is no refresh")); + (_) => Task.FromResult("test2"), + (_) => false); string cachedResults = await asyncCache.GetAsync( "test", - () => throw new Exception("should not refresh"), - (_) => false, - (x, y) => throw new Exception("Should not be called since there is no refresh")); + (_) => throw new Exception("should not refresh"), + (_) => false); string oldValue = null; - string newValue = null; Task updateTask = asyncCache.GetAsync( key: "test", - singleValueInitFunc: async () => + singleValueInitFunc: async (staleValue) => { + oldValue = staleValue; await Task.Delay(TimeSpan.FromSeconds(1)); return "Test3"; }, - forceRefresh: (_) => true, - callBackOnForceRefresh: (x, y) => { oldValue = x; newValue = y; }); + forceRefresh: (_) => true); Stopwatch concurrentOperationStopwatch = Stopwatch.StartNew(); string concurrentUpdateTask = await asyncCache.GetAsync( "test", - () => throw new Exception("should not refresh"), - (_) => false, - (x, y) => throw new Exception("Should not be called since there is no refresh")); + (_) => throw new Exception("should not refresh"), + (_) => false); Assert.AreEqual("test2", result); concurrentOperationStopwatch.Stop(); @@ -219,7 +206,6 @@ public async Task ValidateAsyncCacheNonBlocking() result = await updateTask; Assert.AreEqual("Test3", result); Assert.AreEqual(oldValue, "test2", "The call back was not done."); - Assert.AreEqual(newValue, "Test3", "The call back was not done."); } [TestMethod] @@ -229,16 +215,14 @@ public async Task ValidateCacheValueIsRemovedAfterException() AsyncCacheNonBlocking asyncCache = new AsyncCacheNonBlocking(); string result = await asyncCache.GetAsync( key: "test", - singleValueInitFunc: () => Task.FromResult("test2"), - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + singleValueInitFunc: (_) => Task.FromResult("test2"), + forceRefresh: (_) => false); Assert.AreEqual("test2", result); string cachedResults = await asyncCache.GetAsync( key: "test", - singleValueInitFunc: () => throw new Exception("should not refresh"), - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + singleValueInitFunc: (_) => throw new Exception("should not refresh"), + forceRefresh: (_) => false); Assert.AreEqual("test2", cachedResults); // Simulate a slow connection on a refresh operation. The async call will @@ -250,7 +234,7 @@ public async Task ValidateCacheValueIsRemovedAfterException() { await asyncCache.GetAsync( key: "test", - singleValueInitFunc: async () => + singleValueInitFunc: async (_) => { while (delayException) { @@ -259,8 +243,7 @@ await asyncCache.GetAsync( throw new NotFoundException("testNotFoundException"); }, - forceRefresh: (_) => true, - callBackOnForceRefresh: null); + forceRefresh: (_) => true); Assert.Fail(); } catch (NotFoundException nfe) @@ -271,9 +254,8 @@ await asyncCache.GetAsync( cachedResults = await asyncCache.GetAsync( key: "test", - singleValueInitFunc: () => throw new Exception("should not refresh"), - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + singleValueInitFunc: (_) => throw new Exception("should not refresh"), + forceRefresh: (_) => false); Assert.AreEqual("test2", cachedResults); delayException = false; @@ -294,13 +276,12 @@ public async Task ValidateConcurrentCreateAsyncCacheNonBlocking() { tasks.Add(Task.Run(() => asyncCache.GetAsync( key: "key", - singleValueInitFunc: () => + singleValueInitFunc: (_) => { Interlocked.Increment(ref totalLazyCalls); return Task.FromResult("Test"); }, - forceRefresh: (_) => false, - callBackOnForceRefresh: (x, y) => throw new Exception("Should not be called since there is no refresh")))); + forceRefresh: (_) => false))); } await Task.WhenAll(tasks); @@ -326,14 +307,13 @@ public async Task ValidateConcurrentCreateWithFailureAsyncCacheNonBlocking() { await asyncCache.GetAsync( key: "key", - singleValueInitFunc: async () => + singleValueInitFunc: async (_) => { Interlocked.Increment(ref totalLazyCalls); await Task.Delay(random.Next(0, 3)); throw new NotFoundException("test"); }, - forceRefresh: (_) => false, - callBackOnForceRefresh: (x, y) => throw new Exception("Should not be called since there is no refresh")); + forceRefresh: (_) => false); Assert.Fail(); } catch (DocumentClientException dce) @@ -358,15 +338,14 @@ public async Task ValidateExceptionScenariosCacheNonBlocking() { await asyncCache.GetAsync( key: "key", - singleValueInitFunc: async () => + singleValueInitFunc: async (_) => { // Use a dummy await to make it simulate a real async network call await Task.CompletedTask; Interlocked.Increment(ref totalLazyCalls); throw new DocumentClientException("test", HttpStatusCode.NotFound, SubStatusCodes.Unknown); }, - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + forceRefresh: (_) => false); Assert.Fail(); } catch (DocumentClientException dce) @@ -382,15 +361,14 @@ await asyncCache.GetAsync( { await asyncCache.GetAsync( key: "key", - singleValueInitFunc: async () => + singleValueInitFunc: async (_) => { // Use a dummy await to make it simulate a real async network call await Task.CompletedTask; Interlocked.Increment(ref totalLazyCalls); throw new DocumentClientException("test", HttpStatusCode.BadRequest, SubStatusCodes.Unknown); }, - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + forceRefresh: (_) => false); Assert.Fail(); } catch (DocumentClientException dce) @@ -403,15 +381,14 @@ await asyncCache.GetAsync( totalLazyCalls = 0; string result = await asyncCache.GetAsync( key: "key", - singleValueInitFunc: async () => + singleValueInitFunc: async (_) => { // Use a dummy await to make it simulate a real async network call await Task.CompletedTask; Interlocked.Increment(ref totalLazyCalls); return "Test3"; }, - forceRefresh: (_) => false, - callBackOnForceRefresh: null); + forceRefresh: (_) => false); Assert.AreEqual(1, totalLazyCalls); Assert.AreEqual("Test3", result); } From 6f28f2fc3e836d02087afc005086e88e40de5e0b Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Tue, 15 Mar 2022 09:51:48 -0700 Subject: [PATCH 2/2] Fix address logging --- .../src/Routing/GatewayAddressCache.cs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index 53d21ad16f..217cc6b2d2 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -192,11 +192,15 @@ public async Task TryGetAddressesAsync( { addresses = await this.serverPartitionAddressCache.GetAsync( key: partitionKeyRangeIdentity, - singleValueInitFunc: (staleAddresses) => this.GetAddressesForRangeIdAsync( - request, - partitionKeyRangeIdentity.CollectionRid, - partitionKeyRangeIdentity.PartitionKeyRangeId, - forceRefresh: forceRefreshPartitionAddresses), + singleValueInitFunc: (currentCachedValue) => + { + staleAddressInfo = currentCachedValue; + return this.GetAddressesForRangeIdAsync( + request, + partitionKeyRangeIdentity.CollectionRid, + partitionKeyRangeIdentity.PartitionKeyRangeId, + forceRefresh: forceRefreshPartitionAddresses); + }, forceRefresh: (_) => true); if (staleAddressInfo != null)