From a6011084d04720f49d41b46e2fc9ca93b4c57a24 Mon Sep 17 00:00:00 2001 From: Jake Willey Date: Thu, 31 Mar 2022 06:01:41 -0700 Subject: [PATCH] Availability: Adds optimization to reduce metadata calls for addresses --- .../src/Routing/GatewayAddressCache.cs | 22 +++- .../GatewayAddressCacheTests.cs | 122 +++++++++++++++--- .../Utils/MockCosmosUtil.cs | 38 ++++++ 3 files changed, 160 insertions(+), 22 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs index c9b78133c2..4e4c1a79d5 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GatewayAddressCache.cs @@ -193,7 +193,7 @@ public async Task TryGetAddressesAsync( addresses = await this.serverPartitionAddressCache.GetAsync( key: partitionKeyRangeIdentity, singleValueInitFunc: (currentCachedValue) => - { + { staleAddressInfo = currentCachedValue; return this.GetAddressesForRangeIdAsync( request, @@ -201,7 +201,18 @@ public async Task TryGetAddressesAsync( partitionKeyRangeIdentity.PartitionKeyRangeId, forceRefresh: forceRefreshPartitionAddresses); }, - forceRefresh: (_) => true); + forceRefresh: (currentCachedValue) => + { + int cachedHashCode = request?.RequestContext?.LastPartitionAddressInformationHashCode ?? 0; + if (cachedHashCode == 0) + { + return true; + } + + // The cached value is different then the previous access hash then assume + // another request already updated the cache since there is a new value in the cache + return currentCachedValue.GetHashCode() == cachedHashCode; + }); if (staleAddressInfo != null) { @@ -222,6 +233,13 @@ public async Task TryGetAddressesAsync( forceRefresh: (_) => false); } + // Always save the hash code. This is used to determine if another request already updated the cache. + // This helps reduce latency by avoiding uncessary cache refreshes. + if (request?.RequestContext != null) + { + request.RequestContext.LastPartitionAddressInformationHashCode = addresses.GetHashCode(); + } + int targetReplicaSetSize = this.serviceConfigReader.UserReplicationPolicy.MaxReplicaSetSize; if (addresses.AllAddresses.Count() < targetReplicaSetSize) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs index 433940e63a..260f5069a1 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayAddressCacheTests.cs @@ -4,20 +4,20 @@ namespace Microsoft.Azure.Cosmos { using System; - using System.Net.Http; - using System.Threading.Tasks; - using System.Threading; - using System.Net; - using System.Text; - using System.Collections.ObjectModel; using System.Collections.Generic; + using System.Collections.ObjectModel; using System.Linq; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using Moq; - using Microsoft.Azure.Documents; + using System.Net; + using System.Net.Http; + using System.Text; + using System.Threading; + using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Cosmos.Tests; using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; /// /// Tests for . @@ -54,7 +54,7 @@ public void TestGatewayAddressCacheAutoRefreshOnSuboptimalPartition() httpClient.Timeout = TimeSpan.FromSeconds(120); GatewayAddressCache cache = new GatewayAddressCache( new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), - Documents.Client.Protocol.Https, + Documents.Client.Protocol.Tcp, this.mockTokenProvider.Object, this.mockServiceConfigReader.Object, MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), @@ -89,7 +89,7 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() httpClient.Timeout = TimeSpan.FromSeconds(120); GatewayAddressCache cache = new GatewayAddressCache( new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), - Documents.Client.Protocol.Https, + Documents.Client.Protocol.Tcp, this.mockTokenProvider.Object, this.mockServiceConfigReader.Object, MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), @@ -119,6 +119,89 @@ public async Task TestGatewayAddressCacheUpdateOnConnectionResetAsync() Assert.IsNotNull(addresses.AllAddresses.Select(address => address.PhysicalUri == "https://blabla5.com")); } + [TestMethod] + public async Task TestGatewayAddressCacheAvoidCacheRefresWhenAlreadyUpdatedAsync() + { + Mock mockHttpHandler = new Mock(MockBehavior.Strict); + string oldAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/4s"; + string newAddress = "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/5s"; + mockHttpHandler.SetupSequence(x => x.SendAsync( + It.IsAny(), + It.IsAny())) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s", + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + oldAddress, + })) + .Returns(MockCosmosUtil.CreateHttpResponseOfAddresses(new List() + { + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/1p", + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/2s", + "rntbd://dummytenant.documents.azure.com:14003/apps/APPGUID/services/SERVICEGUID/partitions/PARTITIONGUID/replicas/3s", + newAddress, + })); + + HttpClient httpClient = new HttpClient(new HttpHandlerHelper(mockHttpHandler.Object)); + GatewayAddressCache cache = new GatewayAddressCache( + new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), + Documents.Client.Protocol.Tcp, + this.mockTokenProvider.Object, + this.mockServiceConfigReader.Object, + MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), + suboptimalPartitionForceRefreshIntervalInSeconds: 2, + enableTcpConnectionEndpointRediscovery: true); + + DocumentServiceRequest request1 = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); + DocumentServiceRequest request2 = DocumentServiceRequest.Create(OperationType.Invalid, ResourceType.Address, AuthorizationTokenType.Invalid); + + PartitionAddressInformation request1Addresses = await cache.TryGetAddressesAsync( + request: request1, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + PartitionAddressInformation request2Addresses = await cache.TryGetAddressesAsync( + request: request2, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: false, + cancellationToken: CancellationToken.None); + + Assert.AreEqual(request1Addresses, request2Addresses); + Assert.AreEqual(4, request1Addresses.AllAddresses.Count()); + Assert.AreEqual(1, request1Addresses.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(0, request1Addresses.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + // check if the addresss is updated + request1Addresses = await cache.TryGetAddressesAsync( + request: request1, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: true, + cancellationToken: CancellationToken.None); + + // Even though force refresh is true it will just use the new cache + // value rather than doing a gateway call to do another refresh since the value + // already changed from the last cache access + request2Addresses = await cache.TryGetAddressesAsync( + request: request2, + partitionKeyRangeIdentity: this.testPartitionKeyRangeIdentity, + serviceIdentity: this.serviceIdentity, + forceRefreshPartitionAddresses: true, + cancellationToken: CancellationToken.None); + + Assert.AreEqual(request1Addresses, request2Addresses); + Assert.AreEqual(4, request1Addresses.AllAddresses.Count()); + Assert.AreEqual(0, request1Addresses.AllAddresses.Count(x => x.PhysicalUri == oldAddress)); + Assert.AreEqual(1, request1Addresses.AllAddresses.Count(x => x.PhysicalUri == newAddress)); + + mockHttpHandler.VerifyAll(); + } + + [TestMethod] [Timeout(2000)] public void GlobalAddressResolverUpdateAsyncSynchronizationTest() @@ -155,7 +238,7 @@ public void GlobalAddressResolverUpdateAsyncSynchronizationTest() routingMapProvider: null, serviceConfigReader: this.mockServiceConfigReader.Object, connectionPolicy: connectionPolicy, - httpClient: MockCosmosUtil.CreateCosmosHttpClient(() =>new HttpClient(messageHandler))); + httpClient: MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler))); ConnectionStateListener connectionStateListener = new ConnectionStateListener(globalAddressResolver); connectionStateListener.OnConnectionEvent(ConnectionEvent.ReadEof, DateTime.Now, new Documents.Rntbd.ServerKey(new Uri("https://endpoint.azure.com:4040/"))); @@ -173,11 +256,11 @@ public void GlobalAddressResolverUpdateAsyncSynchronizationTest() public async Task GatewayAddressCacheInNetworkRequestTestAsync() { FakeMessageHandler messageHandler = new FakeMessageHandler(); - HttpClient httpClient = new HttpClient(messageHandler); + HttpClient httpClient = new(messageHandler); httpClient.Timeout = TimeSpan.FromSeconds(120); GatewayAddressCache cache = new GatewayAddressCache( new Uri(GatewayAddressCacheTests.DatabaseAccountApiEndpoint), - Documents.Client.Protocol.Https, + Documents.Client.Protocol.Tcp, this.mockTokenProvider.Object, this.mockServiceConfigReader.Object, MockCosmosUtil.CreateCosmosHttpClient(() => httpClient), @@ -192,7 +275,6 @@ public async Task GatewayAddressCacheInNetworkRequestTestAsync() false, CancellationToken.None); - Assert.IsFalse(legacyRequest.IsLocalRegion); // Header indicates the request is from the same azure region. @@ -234,14 +316,14 @@ protected override Task SendAsync(HttpRequestMessage reques { List
addresses = new List
() { - new Address() { IsPrimary = true, PhysicalUri = "https://blabla.com", Protocol = RuntimeConstants.Protocols.HTTPS, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }, - new Address() { IsPrimary = false, PhysicalUri = "https://blabla3.com", Protocol = RuntimeConstants.Protocols.HTTPS, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }, - new Address() { IsPrimary = false, PhysicalUri = "https://blabla2.com", Protocol = RuntimeConstants.Protocols.HTTPS, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }, + new Address() { IsPrimary = true, PhysicalUri = "https://blabla.com", Protocol = RuntimeConstants.Protocols.RNTBD, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }, + new Address() { IsPrimary = false, PhysicalUri = "https://blabla3.com", Protocol = RuntimeConstants.Protocols.RNTBD, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }, + new Address() { IsPrimary = false, PhysicalUri = "https://blabla2.com", Protocol = RuntimeConstants.Protocols.RNTBD, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }, }; if (this.returnFullReplicaSet) { - addresses.Add(new Address() { IsPrimary = false, PhysicalUri = "https://blabla4.com", Protocol = RuntimeConstants.Protocols.HTTPS, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }); + addresses.Add(new Address() { IsPrimary = false, PhysicalUri = "https://blabla4.com", Protocol = RuntimeConstants.Protocols.RNTBD, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }); this.returnFullReplicaSet = false; } else @@ -252,7 +334,7 @@ protected override Task SendAsync(HttpRequestMessage reques if (this.returnUpdatedAddresses) { addresses.RemoveAll(address => address.IsPrimary == true); - addresses.Add(new Address() { IsPrimary = true, PhysicalUri = "https://blabla5.com", Protocol = RuntimeConstants.Protocols.HTTPS, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }); + addresses.Add(new Address() { IsPrimary = true, PhysicalUri = "https://blabla5.com", Protocol = RuntimeConstants.Protocols.RNTBD, PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" }); this.returnUpdatedAddresses = false; } else diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockCosmosUtil.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockCosmosUtil.cs index 24fb287503..4a4447d0e3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockCosmosUtil.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Utils/MockCosmosUtil.cs @@ -8,6 +8,9 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Net; + using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; @@ -114,5 +117,40 @@ public static Mock GetPartitionRoutingHelperMock(string )).Returns(Task.FromResult(true)); return partitionRoutingHelperMock; } + + public static Task CreateHttpResponseOfAddresses(List physicalUris) + { + List
addresses = new List
(); + for (int i = 0; i < physicalUris.Count; i++) + { + addresses.Add(new Address() + { + IsPrimary = i == 0, + PhysicalUri = physicalUris[i], + Protocol = RuntimeConstants.Protocols.RNTBD, + PartitionKeyRangeId = "YxM9ANCZIwABAAAAAAAAAA==" + }); + }; + + FeedResource
addressFeedResource = new FeedResource
() + { + Id = "YxM9ANCZIwABAAAAAAAAAA==", + SelfLink = "dbs/YxM9AA==/colls/YxM9ANCZIwA=/docs/YxM9ANCZIwABAAAAAAAAAA==/", + Timestamp = DateTime.Now, + InnerCollection = new Collection
(addresses), + }; + + StringBuilder feedResourceString = new StringBuilder(); + addressFeedResource.SaveTo(feedResourceString); + + StringContent content = new StringContent(feedResourceString.ToString()); + HttpResponseMessage responseMessage = new HttpResponseMessage() + { + StatusCode = HttpStatusCode.OK, + Content = content, + }; + + return Task.FromResult(responseMessage); + } } }