From 75162b329a01d7766a74dcc4730909989cec1ea0 Mon Sep 17 00:00:00 2001 From: j82w Date: Thu, 20 May 2021 16:29:26 -0700 Subject: [PATCH] Availability: Fixes the get account information to stop the background refresh after Client is disposed (#2483) Fixes a bug where the get account information was continuing after the client and GlobalEndpointManager was disposed. It now wires through the cancellation token. Adding the missing using statements which are missing from several tests. This causes the background threads to continue to run after the test is done. --- .../src/GatewayAccountReader.cs | 5 +- .../src/Routing/GlobalEndpointManager.cs | 26 ++- .../CancellationTokenTests.cs | 4 +- .../ExceptionlessTests.cs | 2 +- .../GatewayStoreModelTest.cs | 211 ++++++++++-------- .../GlobalEndpointManagerTest.cs | 147 ++++++++---- .../LocationCacheTests.cs | 98 ++++---- .../GlobalPartitionEndpointManagerTests.cs | 77 ++++--- .../MockSetupsHelper.cs | 18 +- .../RegionFailoverTests.cs | 66 +++--- 10 files changed, 366 insertions(+), 288 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs b/Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs index 2bd73054ff..37d6add024 100644 --- a/Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs +++ b/Microsoft.Azure.Cosmos/src/GatewayAccountReader.cs @@ -81,7 +81,10 @@ await this.cosmosAuthorization.AddAuthorizationHeaderAsync( public async Task InitializeReaderAsync() { AccountProperties databaseAccount = await GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( - this.serviceEndpoint, this.connectionPolicy.PreferredLocations, this.GetDatabaseAccountAsync); + defaultEndpoint: this.serviceEndpoint, + locations: this.connectionPolicy.PreferredLocations, + getDatabaseAccountFn: this.GetDatabaseAccountAsync, + cancellationToken: default); return databaseAccount; } diff --git a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs index c635218eaa..4b0c2da9fb 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/GlobalEndpointManager.cs @@ -105,12 +105,14 @@ public GlobalEndpointManager(IDocumentClientInternal owner, ConnectionPolicy con public static Task GetDatabaseAccountFromAnyLocationsAsync( Uri defaultEndpoint, IList? locations, - Func> getDatabaseAccountFn) + Func> getDatabaseAccountFn, + CancellationToken cancellationToken) { GetAccountPropertiesHelper threadSafeGetAccountHelper = new GetAccountPropertiesHelper( defaultEndpoint, locations?.GetEnumerator(), - getDatabaseAccountFn); + getDatabaseAccountFn, + cancellationToken); return threadSafeGetAccountHelper.GetAccountPropertiesAsync(); } @@ -120,7 +122,7 @@ public static Task GetDatabaseAccountFromAnyLocationsAsync( /// private class GetAccountPropertiesHelper { - private readonly CancellationTokenSource CancellationTokenSource = new CancellationTokenSource(); + private readonly CancellationTokenSource CancellationTokenSource; private readonly Uri DefaultEndpoint; private readonly IEnumerator? Locations; private readonly Func> GetDatabaseAccountFn; @@ -131,11 +133,13 @@ private class GetAccountPropertiesHelper public GetAccountPropertiesHelper( Uri defaultEndpoint, IEnumerator? locations, - Func> getDatabaseAccountFn) + Func> getDatabaseAccountFn, + CancellationToken cancellationToken) { this.DefaultEndpoint = defaultEndpoint; this.Locations = locations; this.GetDatabaseAccountFn = getDatabaseAccountFn; + this.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } public async Task GetAccountPropertiesAsync() @@ -275,6 +279,7 @@ private async Task GetAndUpdateAccountPropertiesAsync(Uri endpoint) { if (this.CancellationTokenSource.IsCancellationRequested) { + this.LastTransientException = new OperationCanceledException("GlobalEndpointManager: Get account information canceled"); return; } @@ -435,6 +440,11 @@ private async void StartLocationBackgroundRefreshLoop() DefaultTrace.TraceInformation("GlobalEndpointManager: StartLocationBackgroundRefreshWithTimer() - Invoking refresh"); + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + await this.RefreshDatabaseAccountInternalAsync(forceRefresh: false); } catch (Exception ex) @@ -467,6 +477,11 @@ private void OnPreferenceChanged(object sender, NotifyCollectionChangedEventArgs /// private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) { + if (this.cancellationTokenSource.IsCancellationRequested) + { + return; + } + if (this.SkipRefresh(forceRefresh)) { return; @@ -498,7 +513,8 @@ private async Task RefreshDatabaseAccountInternalAsync(bool forceRefresh) singleValueInitFunc: () => GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( this.defaultEndpoint, this.connectionPolicy.PreferredLocations, - this.GetDatabaseAccountAsync), + this.GetDatabaseAccountAsync, + this.cancellationTokenSource.Token), cancellationToken: this.cancellationTokenSource.Token, forceRefresh: true); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs index 3d2d2b26ff..948f7ec9b7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CancellationTokenTests.cs @@ -52,11 +52,11 @@ async Task sendFunc(HttpRequestMessage request) Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); ISessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs index 1e16574152..dc53dfa741 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ExceptionlessTests.cs @@ -297,7 +297,7 @@ async Task sendFunc(HttpRequestMessage httpRequest) return await Task.FromResult(new HttpResponseMessage((HttpStatusCode)responseStatusCode)); } - GatewayStoreModel storeModel = MockGatewayStoreModel(sendFunc); + using GatewayStoreModel storeModel = MockGatewayStoreModel(sendFunc); using (new ActivityScope(Guid.NewGuid())) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs index 58449cee7f..82f7a9e10a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs @@ -116,11 +116,11 @@ public async Task TestRetries() Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); ISessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, @@ -191,14 +191,17 @@ public async Task TestApplySessionForMasterOperation() dsr.Headers.Add(HttpConstants.HttpHeaders.SessionToken, Guid.NewGuid().ToString()); - await GatewayStoreModel.ApplySessionTokenAsync( - dsr, - ConsistencyLevel.Session, - new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, - clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + await GatewayStoreModel.ApplySessionTokenAsync( + dsr, + ConsistencyLevel.Session, + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object); - Assert.IsNull(dsr.Headers[HttpConstants.HttpHeaders.SessionToken]); + Assert.IsNull(dsr.Headers[HttpConstants.HttpHeaders.SessionToken]); + }); } } @@ -214,15 +217,17 @@ await GatewayStoreModel.ApplySessionTokenAsync( dsrQueryPlan.Headers.Add(HttpConstants.HttpHeaders.SessionToken, Guid.NewGuid().ToString()); - await GatewayStoreModel.ApplySessionTokenAsync( - dsrQueryPlan, - ConsistencyLevel.Session, - new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, - clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); - - - Assert.IsNull(dsrQueryPlan.Headers[HttpConstants.HttpHeaders.SessionToken]); + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + await GatewayStoreModel.ApplySessionTokenAsync( + dsrQueryPlan, + ConsistencyLevel.Session, + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object); + + Assert.IsNull(dsrQueryPlan.Headers[HttpConstants.HttpHeaders.SessionToken]); + }); } [TestMethod] @@ -263,15 +268,17 @@ public async Task TestApplySessionForDataOperation() string dsrSessionToken = Guid.NewGuid().ToString(); dsr.Headers.Add(HttpConstants.HttpHeaders.SessionToken, dsrSessionToken); - await GatewayStoreModel.ApplySessionTokenAsync( - dsr, - ConsistencyLevel.Session, - new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, - clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); - + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + await GatewayStoreModel.ApplySessionTokenAsync( + dsr, + ConsistencyLevel.Session, + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object); - Assert.AreEqual(dsrSessionToken, dsr.Headers[HttpConstants.HttpHeaders.SessionToken]); + Assert.AreEqual(dsrSessionToken, dsr.Headers[HttpConstants.HttpHeaders.SessionToken]); + }); } { @@ -286,22 +293,24 @@ await GatewayStoreModel.ApplySessionTokenAsync( Mock sMock = new Mock(); sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken); - await GatewayStoreModel.ApplySessionTokenAsync( - dsrNoSessionToken, - ConsistencyLevel.Session, - sMock.Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, - clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); - - - if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch) - { - Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); - } - else + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => { - Assert.IsNull(dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); - } + await GatewayStoreModel.ApplySessionTokenAsync( + dsrNoSessionToken, + ConsistencyLevel.Session, + sMock.Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object); + + if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch) + { + Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); + } + else + { + Assert.IsNull(dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); + } + }); } { @@ -369,15 +378,17 @@ await GatewayStoreModel.ApplySessionTokenAsync( string sessionToken = Guid.NewGuid().ToString(); dsrSprocExecute.Headers.Add(HttpConstants.HttpHeaders.SessionToken, sessionToken); - await GatewayStoreModel.ApplySessionTokenAsync( - dsrSprocExecute, - ConsistencyLevel.Session, - new Mock().Object, - partitionKeyRangeCache: new Mock(null, null, null).Object, - clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); - - - Assert.AreEqual(sessionToken, dsrSprocExecute.Headers[HttpConstants.HttpHeaders.SessionToken]); + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + await GatewayStoreModel.ApplySessionTokenAsync( + dsrSprocExecute, + ConsistencyLevel.Session, + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object); + + Assert.AreEqual(sessionToken, dsrSprocExecute.Headers[HttpConstants.HttpHeaders.SessionToken]); + }); } [TestMethod] @@ -392,11 +403,11 @@ public async Task TestErrorResponsesProvideBody() Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); ISessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, @@ -454,11 +465,11 @@ private async Task GatewayStoreModel_Exception_UpdateSessionTokenOnKnownExceptio Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); SessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, @@ -518,11 +529,11 @@ private async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownExcep Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); SessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, @@ -567,9 +578,9 @@ private async Task GatewayStoreModel_Exception_NotUpdateSessionTokenOnKnownExcep [TestMethod] public async Task TestSessionTokenForSessionConsistentResourceType() { - GatewayStoreModel storeModel = GetGatewayStoreModelForConsistencyTest(); - - using (DocumentServiceRequest request = + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + using (DocumentServiceRequest request = DocumentServiceRequest.Create( Documents.OperationType.Read, Documents.ResourceType.Collection, @@ -577,9 +588,10 @@ public async Task TestSessionTokenForSessionConsistentResourceType() new MemoryStream(Encoding.UTF8.GetBytes("collection")), AuthorizationTokenType.PrimaryMasterKey, null)) - { - await TestGatewayStoreModelProcessMessageAsync(storeModel, request); - } + { + await this.TestGatewayStoreModelProcessMessageAsync(gatewayStoreModel, request); + } + }); } /// @@ -589,9 +601,9 @@ public async Task TestSessionTokenForSessionConsistentResourceType() [TestMethod] public async Task TestSessionTokenForSessionInconsistentResourceType() { - GatewayStoreModel storeModel = GetGatewayStoreModelForConsistencyTest(); - - using (DocumentServiceRequest request = + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + using (DocumentServiceRequest request = DocumentServiceRequest.Create( Documents.OperationType.Query, Documents.ResourceType.Document, @@ -599,9 +611,10 @@ public async Task TestSessionTokenForSessionInconsistentResourceType() new MemoryStream(Encoding.UTF8.GetBytes("document")), AuthorizationTokenType.PrimaryMasterKey, null)) - { - await TestGatewayStoreModelProcessMessageAsync(storeModel, request); - } + { + await this.TestGatewayStoreModelProcessMessageAsync(gatewayStoreModel, request); + } + }); } /// @@ -611,9 +624,9 @@ public async Task TestSessionTokenForSessionInconsistentResourceType() [TestMethod] public async Task TestSessionTokenAvailability() { - GatewayStoreModel storeModel = GetGatewayStoreModelForConsistencyTest(); - - using (DocumentServiceRequest request = + await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) => + { + using (DocumentServiceRequest request = DocumentServiceRequest.Create( Documents.OperationType.Read, Documents.ResourceType.Collection, @@ -621,22 +634,22 @@ public async Task TestSessionTokenAvailability() new MemoryStream(Encoding.UTF8.GetBytes("collection")), AuthorizationTokenType.PrimaryMasterKey, null)) - { - await TestGatewayStoreModelProcessMessageAsync(storeModel, request); - } - - using (DocumentServiceRequest request = - DocumentServiceRequest.Create( - Documents.OperationType.Query, - Documents.ResourceType.Document, - new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute), - new MemoryStream(Encoding.UTF8.GetBytes("document")), - AuthorizationTokenType.PrimaryMasterKey, - null)) - { - await TestGatewayStoreModelProcessMessageAsync(storeModel, request); - } + { + await this.TestGatewayStoreModelProcessMessageAsync(gatewayStoreModel, request); + } + using (DocumentServiceRequest request = + DocumentServiceRequest.Create( + Documents.OperationType.Query, + Documents.ResourceType.Document, + new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute), + new MemoryStream(Encoding.UTF8.GetBytes("document")), + AuthorizationTokenType.PrimaryMasterKey, + null)) + { + await this.TestGatewayStoreModelProcessMessageAsync(gatewayStoreModel, request); + } + }); } [TestMethod] @@ -664,11 +677,11 @@ private async Task GatewayStoreModel_Exceptionless_UpdateSessionTokenOnKnownResp Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); SessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, @@ -730,11 +743,11 @@ private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownR Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); SessionContainer sessionContainer = new SessionContainer(string.Empty); DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc); - GatewayStoreModel storeModel = new GatewayStoreModel( + using GatewayStoreModel storeModel = new GatewayStoreModel( endpointManager, sessionContainer, ConsistencyLevel.Eventual, @@ -779,9 +792,10 @@ protected override async Task SendAsync(HttpRequestMessage } } - private GatewayStoreModel GetGatewayStoreModelForConsistencyTest() + private async Task GetGatewayStoreModelForConsistencyTest( + Func executeWithGatewayStoreModel) { - Func> messageHandler = async request => + static async Task messageHandler(HttpRequestMessage request) { String content = await request.Content.ReadAsStringAsync(); if (content.Equals("document")) @@ -797,17 +811,16 @@ private GatewayStoreModel GetGatewayStoreModelForConsistencyTest() } else { - IEnumerable enumerable; - Assert.IsFalse(request.Headers.TryGetValues("x-ms-session-token", out enumerable)); + Assert.IsFalse(request.Headers.TryGetValues("x-ms-session-token", out IEnumerable enumerable)); } return new HttpResponseMessage(HttpStatusCode.OK) { Content = new StringContent("Response") }; - }; + } Mock mockDocumentClient = new Mock(); mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo")); mockDocumentClient.Setup(client => client.ConsistencyLevel).Returns(Documents.ConsistencyLevel.Session); - GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); + using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy()); SessionContainer sessionContainer = new SessionContainer(string.Empty); sessionContainer.SetSessionToken( @@ -818,8 +831,8 @@ private GatewayStoreModel GetGatewayStoreModelForConsistencyTest() DocumentClientEventSource eventSource = DocumentClientEventSource.Instance; HttpMessageHandler httpMessageHandler = new MockMessageHandler(messageHandler); - GatewayStoreModel storeModel = new GatewayStoreModel( - endpointManager, + using GatewayStoreModel storeModel = new GatewayStoreModel( + endpointManager, sessionContainer, ConsistencyLevel.Eventual, eventSource, @@ -830,7 +843,7 @@ private GatewayStoreModel GetGatewayStoreModelForConsistencyTest() PartitionKeyRangeCache partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache).Object; storeModel.SetCaches(partitionKeyRangeCache, clientCollectionCache); - return storeModel; + await executeWithGatewayStoreModel(storeModel); } private async Task TestGatewayStoreModelProcessMessageAsync(GatewayStoreModel storeModel, DocumentServiceRequest request) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GlobalEndpointManagerTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GlobalEndpointManagerTest.cs index b8373293fa..891917074d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GlobalEndpointManagerTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GlobalEndpointManagerTest.cs @@ -27,56 +27,104 @@ public class GlobalEndpointManagerTest [TestMethod] public async Task EndpointFailureMockTest() { - // Setup dummpy read locations for the database account - Collection readableLocations = new Collection(); + Environment.SetEnvironmentVariable("MinimumIntervalForNonForceRefreshLocationInMS", "100"); + try + { + // Setup dummpy read locations for the database account + Collection readableLocations = new Collection(); - AccountRegion writeLocation = new AccountRegion(); - writeLocation.Name = "WriteLocation"; - writeLocation.Endpoint = "https://writeendpoint.net/"; + AccountRegion writeLocation = new AccountRegion(); + writeLocation.Name = "WriteLocation"; + writeLocation.Endpoint = "https://writeendpoint.net/"; - AccountRegion readLocation1 = new AccountRegion(); - readLocation1.Name = "ReadLocation1"; - readLocation1.Endpoint = "https://readendpoint1.net/"; + AccountRegion readLocation1 = new AccountRegion(); + readLocation1.Name = "ReadLocation1"; + readLocation1.Endpoint = "https://readendpoint1.net/"; - AccountRegion readLocation2 = new AccountRegion(); - readLocation2.Name = "ReadLocation2"; - readLocation2.Endpoint = "https://readendpoint2.net/"; + AccountRegion readLocation2 = new AccountRegion(); + readLocation2.Name = "ReadLocation2"; + readLocation2.Endpoint = "https://readendpoint2.net/"; - readableLocations.Add(writeLocation); - readableLocations.Add(readLocation1); - readableLocations.Add(readLocation2); + readableLocations.Add(writeLocation); + readableLocations.Add(readLocation1); + readableLocations.Add(readLocation2); - AccountProperties databaseAccount = new AccountProperties(); - databaseAccount.ReadLocationsInternal = readableLocations; + AccountProperties databaseAccount = new AccountProperties(); + databaseAccount.ReadLocationsInternal = readableLocations; - //Setup mock owner "document client" - Mock mockOwner = new Mock(); - mockOwner.Setup(owner => owner.ServiceEndpoint).Returns(new Uri("https://defaultendpoint.net/")); - mockOwner.Setup(owner => owner.GetDatabaseAccountInternalAsync(It.IsAny(), It.IsAny())).ReturnsAsync(databaseAccount); + //Setup mock owner "document client" + Mock mockOwner = new Mock(); + mockOwner.Setup(owner => owner.ServiceEndpoint).Returns(new Uri("https://defaultendpoint.net/")); - //Create connection policy and populate preferred locations - ConnectionPolicy connectionPolicy = new ConnectionPolicy(); - connectionPolicy.PreferredLocations.Add("ReadLocation1"); - connectionPolicy.PreferredLocations.Add("ReadLocation2"); + int getAccountInfoCount = 0; + mockOwner.Setup(owner => owner.GetDatabaseAccountInternalAsync(It.IsAny(), It.IsAny())) + .Callback(() => getAccountInfoCount++) + .ReturnsAsync(databaseAccount); - GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(mockOwner.Object, connectionPolicy); + //Create connection policy and populate preferred locations + ConnectionPolicy connectionPolicy = new ConnectionPolicy(); + connectionPolicy.PreferredLocations.Add("ReadLocation1"); + connectionPolicy.PreferredLocations.Add("ReadLocation2"); - globalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(databaseAccount); - Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation1.Endpoint)); + using (GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(mockOwner.Object, connectionPolicy)) + { + globalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(databaseAccount); + Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation1.Endpoint)); - //Mark each of the read locations as unavailable and validate that the read endpoint switches to the next preferred region / default endpoint. - globalEndpointManager.MarkEndpointUnavailableForRead(globalEndpointManager.ReadEndpoints[0]); - globalEndpointManager.RefreshLocationAsync().Wait(); - Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation2.Endpoint)); + //Mark each of the read locations as unavailable and validate that the read endpoint switches to the next preferred region / default endpoint. + globalEndpointManager.MarkEndpointUnavailableForRead(globalEndpointManager.ReadEndpoints[0]); + await globalEndpointManager.RefreshLocationAsync(); + Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation2.Endpoint)); - globalEndpointManager.MarkEndpointUnavailableForRead(globalEndpointManager.ReadEndpoints[0]); - await globalEndpointManager.RefreshLocationAsync(); - Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], globalEndpointManager.WriteEndpoints[0]); + globalEndpointManager.MarkEndpointUnavailableForRead(globalEndpointManager.ReadEndpoints[0]); + await globalEndpointManager.RefreshLocationAsync(); + Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], globalEndpointManager.WriteEndpoints[0]); - //Sleep a second for the unavailable endpoint entry to expire and background refresh timer to kick in - Thread.Sleep(3000); - await globalEndpointManager.RefreshLocationAsync(); - Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation1.Endpoint)); + getAccountInfoCount = 0; + //Sleep a second for the unavailable endpoint entry to expire and background refresh timer to kick in + await Task.Delay(TimeSpan.FromSeconds(3)); + Assert.IsTrue(getAccountInfoCount > 0, "Callback is not working. There should be at least one call in this time frame."); + + await globalEndpointManager.RefreshLocationAsync(); + Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation1.Endpoint)); + } + + Assert.IsTrue(getAccountInfoCount > 0, "Callback is not working. There should be at least one call in this time frame."); + getAccountInfoCount = 0; + Thread.Sleep(TimeSpan.FromSeconds(3)); + Assert.AreEqual(0, getAccountInfoCount, "There should be no more account calls after the GlobalEndpointManager is disposed"); + } + finally + { + Environment.SetEnvironmentVariable("MinimumIntervalForNonForceRefreshLocationInMS", null); + } + } + + [TestMethod] + public async Task ValidateCancellationTokenLogicForGetDatabaseAccountFromAnyLocationAsync() + { + Uri defaultEndpoint = new Uri("https://testfailover.documents-test.windows-int.net/"); + using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + cancellationTokenSource.Cancel(); + + try + { + await GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( + defaultEndpoint, + locations: new List(){ + "westus", + "southeastasia", + "northcentralus" + }, + getDatabaseAccountFn: (uri) => throw new Exception("The operation should be canceled and never make the network call."), + cancellationTokenSource.Token); + + Assert.Fail("Previous call should have failed"); + } + catch (OperationCanceledException op) + { + Assert.IsTrue(op.Message.Contains("GlobalEndpointManager")); + } } /// @@ -106,7 +154,8 @@ await GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( } throw new Exception("This should never be hit since it should stop after the global endpoint hit the nonretriable exception"); - }); + }, + cancellationToken: default); Assert.Fail("Should throw the UnauthorizedException"); } @@ -136,7 +185,8 @@ await GlobalEndpointManager.GetDatabaseAccountFromAnyLocationsAsync( } throw new Microsoft.Azure.Documents.UnauthorizedException("Mock failed exception"); - }); + }, + cancellationToken: default); Assert.Fail("Should throw the UnauthorizedException"); } @@ -192,7 +242,8 @@ public async Task GetDatabaseAccountFromAnyLocationsMockTestAsync() "southeastasia", "northcentralus" }, - getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri)); + getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri), + cancellationToken: default); Assert.AreEqual(globalEndpointResult, databaseAccount); Assert.AreEqual(0, slowPrimaryRegionHelper.FailedEndpointCount); @@ -213,7 +264,8 @@ public async Task GetDatabaseAccountFromAnyLocationsMockTestAsync() "southeastasia", "northcentralus" }, - getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri)); + getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri), + cancellationToken: default); stopwatch.Stop(); Assert.AreEqual(globalEndpointResult, databaseAccount); @@ -235,7 +287,8 @@ public async Task GetDatabaseAccountFromAnyLocationsMockTestAsync() "southeastasia", "northcentralus" }, - getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri)); + getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri), + cancellationToken: default); Assert.AreEqual(globalEndpointResult, databaseAccount); Assert.AreEqual(3, slowPrimaryRegionHelper.FailedEndpointCount); @@ -255,7 +308,8 @@ public async Task GetDatabaseAccountFromAnyLocationsMockTestAsync() "southeastasia", "northcentralus" }, - getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri)); + getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri), + cancellationToken: default); Assert.AreEqual(globalEndpointResult, databaseAccount); Assert.AreEqual(0, slowPrimaryRegionHelper.FailedEndpointCount); @@ -279,7 +333,8 @@ public async Task GetDatabaseAccountFromAnyLocationsMockTestAsync() "westus6", "westus7", }, - getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri)); + getDatabaseAccountFn: (uri) => slowPrimaryRegionHelper.RequestHelper(uri), + cancellationToken: default); Assert.AreEqual(globalEndpointResult, databaseAccount); Assert.AreEqual(5, slowPrimaryRegionHelper.FailedEndpointCount); @@ -393,7 +448,7 @@ public void ReadLocationRemoveAndAddMockTest() connectionPolicy.PreferredLocations.Add("ReadLocation1"); connectionPolicy.PreferredLocations.Add("ReadLocation2"); - GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(mockOwner.Object, connectionPolicy); + using GlobalEndpointManager globalEndpointManager = new GlobalEndpointManager(mockOwner.Object, connectionPolicy); globalEndpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(databaseAccount); Assert.AreEqual(globalEndpointManager.ReadEndpoints[0], new Uri(readLocation1.Endpoint)); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs index 6e1f483d3f..5bd200ce40 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/LocationCacheTests.cs @@ -44,28 +44,14 @@ public sealed class LocationCacheTests private ReadOnlyCollection preferredLocations; private AccountProperties databaseAccount; private LocationCache cache; - private GlobalEndpointManager endpointManager; private GlobalPartitionEndpointManager partitionKeyRangeLocationCache; private Mock mockedClient; - [TestCleanup] - public void TestCleanup() - { - if (this.endpointManager != null) - { - try - { - this.endpointManager.Dispose(); - } - catch (Exception) { } - } - } - [TestMethod] [Owner("atulk")] public void ValidateWriteEndpointOrderWithClientSideDisableMultipleWriteLocation() { - this.Initialize(false, true, false); + using GlobalEndpointManager endpointManager = this.Initialize(false, true, false); Assert.AreEqual(this.cache.WriteEndpoints[0], LocationCacheTests.Location1Endpoint); Assert.AreEqual(this.cache.WriteEndpoints[1], LocationCacheTests.Location2Endpoint); Assert.AreEqual(this.cache.WriteEndpoints[2], LocationCacheTests.Location3Endpoint); @@ -75,7 +61,7 @@ public void ValidateWriteEndpointOrderWithClientSideDisableMultipleWriteLocation [Owner("atulk")] public void ValidateGetLocation() { - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: false, enableEndpointDiscovery: true, isPreferredLocationsListEmpty: true); @@ -111,11 +97,11 @@ private async Task ValidateRetryOnSessionNotAvailabeWithEndpointDiscoveryDisable { const bool enableEndpointDiscovery = false; - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: useMultipleWriteLocations, enableEndpointDiscovery: enableEndpointDiscovery, isPreferredLocationsListEmpty: isPreferredLocationsListEmpty); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery, endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: isReadRequest, isMasterResourceType: false)) { @@ -130,7 +116,7 @@ await BackoffRetryUtility.ExecuteAsync( if (retryCount == 0) { - Assert.AreEqual(request.RequestContext.LocationEndpointToRoute, this.endpointManager.ReadEndpoints[0]); + Assert.AreEqual(request.RequestContext.LocationEndpointToRoute, endpointManager.ReadEndpoints[0]); } else { @@ -157,9 +143,15 @@ await BackoffRetryUtility.ExecuteAsync( } } - private ClientRetryPolicy CreateClientRetryPolicy(bool enableEndpointDiscovery) + private ClientRetryPolicy CreateClientRetryPolicy( + bool enableEndpointDiscovery, + GlobalEndpointManager endpointManager) { - return new ClientRetryPolicy(this.endpointManager, this.partitionKeyRangeLocationCache, enableEndpointDiscovery, new RetryOptions()); + return new ClientRetryPolicy( + endpointManager, + this.partitionKeyRangeLocationCache, + enableEndpointDiscovery, + new RetryOptions()); } [TestMethod] @@ -175,13 +167,13 @@ private async Task ValidateRetryOnSessionNotAvailabeWithDisableMultipleWriteLoca const bool useMultipleWriteLocations = false; bool enableEndpointDiscovery = true; - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: useMultipleWriteLocations, enableEndpointDiscovery: enableEndpointDiscovery, isPreferredLocationsListEmpty: isPreferredLocationsListEmpty); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery, endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: true, isMasterResourceType: false)) { @@ -252,14 +244,14 @@ private async Task ValidateRetryOnReadSessionNotAvailabeWithEnableMultipleWriteL "location1" }.AsReadOnly(); - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: useMultipleWriteLocations, enableEndpointDiscovery: enableEndpointDiscovery, isPreferredLocationsListEmpty: false, preferedRegionListOverride: preferredList); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery, endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: true, isMasterResourceType: false)) { @@ -328,14 +320,14 @@ private async Task ValidateRetryOnWriteSessionNotAvailabeWithEnableMultipleWrite "location1" }.AsReadOnly(); - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: useMultipleWriteLocations, enableEndpointDiscovery: enableEndpointDiscovery, isPreferredLocationsListEmpty: false, preferedRegionListOverride: preferredList); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery, endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: false, isMasterResourceType: false)) { @@ -401,13 +393,13 @@ await BackoffRetryUtility.ExecuteAsync( [Owner("atulk")] public async Task ValidateRetryOnWriteForbiddenExceptionAsync() { - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: false, enableEndpointDiscovery: true, isPreferredLocationsListEmpty: false); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(true); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery: true, endpointManager: endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: false, isMasterResourceType: false)) { @@ -473,13 +465,13 @@ public async Task ValidateRetryOnDatabaseAccountNotFoundAsync() private async Task ValidateRetryOnDatabaseAccountNotFoundAsync(bool enableMultipleWriteLocations, bool isReadRequest) { - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: enableMultipleWriteLocations, enableEndpointDiscovery: true, isPreferredLocationsListEmpty: false); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(true); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery: true, endpointManager: endpointManager); int expectedRetryCount = isReadRequest || enableMultipleWriteLocations ? 2 : 1; @@ -576,15 +568,15 @@ private async Task ValidateRetryOnHttpExceptionAsync(bool enableMultipleWriteLoc "location1" }.AsReadOnly(); - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: enableMultipleWriteLocations, enableEndpointDiscovery: true, isPreferredLocationsListEmpty: false, preferedRegionListOverride: preferredList, enforceSingleMasterSingleWriteLocation: true); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(true); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery: true, endpointManager: endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: isReadRequest, isMasterResourceType: false)) { @@ -684,7 +676,7 @@ public async Task ClientRetryPolicy_ValidateRetryOnServiceUnavailable( "location1" }.AsReadOnly(); - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations: useMultipleWriteLocations, enableEndpointDiscovery: enableEndpointDiscovery, isPreferredLocationsListEmpty: !usesPreferredLocations, @@ -692,8 +684,8 @@ public async Task ClientRetryPolicy_ValidateRetryOnServiceUnavailable( preferedRegionListOverride: preferredList, enforceSingleMasterSingleWriteLocation: true); - this.endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); - ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery); + endpointManager.InitializeAccountPropertiesAndStartBackgroundRefresh(this.databaseAccount); + ClientRetryPolicy retryPolicy = this.CreateClientRetryPolicy(enableEndpointDiscovery, endpointManager); using (DocumentServiceRequest request = this.CreateRequest(isReadRequest: isReadRequest, isMasterResourceType: false)) { @@ -782,7 +774,7 @@ private static AccountProperties CreateDatabaseAccount( return databaseAccount; } - private void Initialize( + private GlobalEndpointManager Initialize( bool useMultipleWriteLocations, bool enableEndpointDiscovery, bool isPreferredLocationsListEmpty, @@ -833,16 +825,18 @@ private void Initialize( connectionPolicy.PreferredLocations.Add(preferredLocation); } - this.endpointManager = new GlobalEndpointManager(mockedClient.Object, connectionPolicy); + GlobalEndpointManager endpointManager = new GlobalEndpointManager(this.mockedClient.Object, connectionPolicy); if (enablePartitionLevelFailover) { - this.partitionKeyRangeLocationCache = new GlobalPartitionEndpointManagerCore(this.endpointManager); + this.partitionKeyRangeLocationCache = new GlobalPartitionEndpointManagerCore(endpointManager); } else { this.partitionKeyRangeLocationCache = GlobalPartitionEndpointManagerNoOp.Instance; } + + return endpointManager; } private async Task ValidateLocationCacheAsync( @@ -854,7 +848,7 @@ private async Task ValidateLocationCacheAsync( { for (int readLocationIndex = 0; readLocationIndex < 2; readLocationIndex++) { - this.Initialize( + using GlobalEndpointManager endpointManager = this.Initialize( useMultipleWriteLocations, endpointDiscoveryEnabled, isPreferredListEmpty); @@ -865,13 +859,13 @@ private async Task ValidateLocationCacheAsync( for (int i = 0; i < readLocationIndex; i++) { this.cache.MarkEndpointUnavailableForRead(new Uri(this.databaseAccount.ReadLocationsInternal[i].Endpoint)); - this.endpointManager.MarkEndpointUnavailableForRead(new Uri(this.databaseAccount.ReadLocationsInternal[i].Endpoint)); + endpointManager.MarkEndpointUnavailableForRead(new Uri(this.databaseAccount.ReadLocationsInternal[i].Endpoint)); } for (int i = 0; i < writeLocationIndex; i++) { this.cache.MarkEndpointUnavailableForWrite(new Uri(this.databaseAccount.WriteLocationsInternal[i].Endpoint)); - this.endpointManager.MarkEndpointUnavailableForWrite( + endpointManager.MarkEndpointUnavailableForWrite( new Uri(this.databaseAccount.WriteLocationsInternal[i].Endpoint)); } @@ -902,7 +896,7 @@ private async Task ValidateLocationCacheAsync( currentWriteEndpoints.Count > 1, currentReadEndpoints.Count > 1); - await this.ValidateGlobalEndpointLocationCacheRefreshAsync(); + await this.ValidateGlobalEndpointLocationCacheRefreshAsync(endpointManager); this.ValidateRequestEndpointResolution( useMultipleWriteLocations, @@ -994,9 +988,9 @@ private void ValidateEndpointRefresh( } } - private async Task ValidateGlobalEndpointLocationCacheRefreshAsync() + private async Task ValidateGlobalEndpointLocationCacheRefreshAsync(GlobalEndpointManager endpointManager) { - IEnumerable refreshLocations = Enumerable.Range(0, 10).Select(index => Task.Factory.StartNew(() => this.endpointManager.RefreshLocationAsync(false))); + IEnumerable refreshLocations = Enumerable.Range(0, 10).Select(index => Task.Factory.StartNew(() => endpointManager.RefreshLocationAsync(false))); await Task.WhenAll(refreshLocations); @@ -1004,7 +998,7 @@ private async Task ValidateGlobalEndpointLocationCacheRefreshAsync() this.mockedClient.ResetCalls(); - foreach (Task task in Enumerable.Range(0, 10).Select(index => Task.Factory.StartNew(() => this.endpointManager.RefreshLocationAsync(false)))) + foreach (Task task in Enumerable.Range(0, 10).Select(index => Task.Factory.StartNew(() => endpointManager.RefreshLocationAsync(false)))) { await task; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/GlobalPartitionEndpointManagerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/GlobalPartitionEndpointManagerTests.cs index a3f227bdce..a0c2f408e8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/GlobalPartitionEndpointManagerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/GlobalPartitionEndpointManagerTests.cs @@ -9,14 +9,12 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Linq; using System.Net; using System.Net.Http; + using System.Runtime.CompilerServices; using System.Text; - using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; - using Newtonsoft.Json; [TestClass] public class GlobalPartitionEndpointManagerTests @@ -69,41 +67,48 @@ public async Task TestWriteForbiddenScenarioAsync() TransportClientHandlerFactory = (original) => mockTransport.Object, }; - using CosmosClient customClient = new CosmosClient( + using (CosmosClient customClient = new CosmosClient( globalEndpoint, Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), - cosmosClientOptions); + cosmosClientOptions)) + { - Container container = customClient.GetContainer(databaseName, containerName); - ToDoActivity toDoActivity = new ToDoActivity() - { - Id = "TestItem", - Pk = "TestPk" - }; - ItemResponse response = await container.CreateItemAsync(toDoActivity, new Cosmos.PartitionKey(toDoActivity.Pk)); - Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); - mockTransport.VerifyAll(); - mockHttpHandler.VerifyAll(); + Container container = customClient.GetContainer(databaseName, containerName); - // Clears all the setups. No network calls should be done on the next operation. - mockHttpHandler.Reset(); - mockTransport.Reset(); - mockTransport.Setup(x => x.Dispose()); + ToDoActivity toDoActivity = new ToDoActivity() + { + Id = "TestItem", + Pk = "TestPk" + }; - MockSetupsHelper.SetupCreateItemResponse( - mockTransport, - secondaryRegionPrimaryReplicaUri); + ItemResponse response = await container.CreateItemAsync(toDoActivity, new Cosmos.PartitionKey(toDoActivity.Pk)); + Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + mockTransport.VerifyAll(); + mockHttpHandler.VerifyAll(); - ToDoActivity toDoActivity2 = new ToDoActivity() - { - Id = "TestItem2", - Pk = "TestPk" - }; + // Clears all the setups. No network calls should be done on the next operation. + mockHttpHandler.Reset(); + mockTransport.Reset(); + mockTransport.Setup(x => x.Dispose()); - response = await container.CreateItemAsync(toDoActivity2, new Cosmos.PartitionKey(toDoActivity2.Pk)); - Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + MockSetupsHelper.SetupCreateItemResponse( + mockTransport, + secondaryRegionPrimaryReplicaUri); + + ToDoActivity toDoActivity2 = new ToDoActivity() + { + Id = "TestItem2", + Pk = "TestPk" + }; + + response = await container.CreateItemAsync(toDoActivity2, new Cosmos.PartitionKey(toDoActivity2.Pk)); + Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + } + + await Task.Delay(TimeSpan.FromMinutes(5)); + Console.WriteLine("done"); } [TestMethod] @@ -121,7 +126,7 @@ public async Task TestServiceUnavailableExceptionScenarioAsync() out TransportAddressUri primaryRegionprimaryReplicaUri); Mock mockTransport = new Mock(MockBehavior.Strict); - + MockSetupsHelper.SetupServiceUnavailableException( mockTransport, primaryRegionprimaryReplicaUri); @@ -154,10 +159,10 @@ public async Task TestServiceUnavailableExceptionScenarioAsync() TransportClientHandlerFactory = (original) => mockTransport.Object, }; - using CosmosClient customClient = new CosmosClient( - globalEndpoint, - Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), - cosmosClientOptions); + using CosmosClient customClient = new CosmosClient( + globalEndpoint, + Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), + cosmosClientOptions); Container container = customClient.GetContainer(databaseName, containerName); @@ -307,9 +312,9 @@ private static void SetupAccountAndCacheOperations( out ResourceId containerResourceId, out Mock mockHttpHandler, out IReadOnlyList primaryRegionPartitionKeyRangeIds, - out TransportAddressUri primaryRegionprimaryReplicaUri) + out TransportAddressUri primaryRegionprimaryReplicaUri, + [CallerMemberName] string accountName = nameof(GlobalPartitionEndpointManagerTests)) { - string accountName = "testAccount"; string primaryRegionNameForUri = "eastus"; secondaryRegionNameForUri = "westus"; globalEndpoint = $"https://{accountName}.documents.azure.com:443/"; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/MockSetupsHelper.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/MockSetupsHelper.cs index dae233209f..57ef31767a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/MockSetupsHelper.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/MockSetupsHelper.cs @@ -246,11 +246,7 @@ internal static void SetupWriteForbiddenException( TransportAddressUri physicalUri) { mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(physicalUri, It.IsAny())) - .Returns(() => - { - Console.WriteLine($"WriteForbiddenThrown: {physicalUri}"); - throw new ForbiddenException($"Mock write forbidden exception on URI:{physicalUri}", SubStatusCodes.WriteForbidden); - }); + .Returns(() => throw new ForbiddenException($"Mock write forbidden exception on URI:{physicalUri}", SubStatusCodes.WriteForbidden)); } internal static void SetupServiceUnavailableException( @@ -258,11 +254,7 @@ internal static void SetupServiceUnavailableException( TransportAddressUri physicalUri) { mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(physicalUri, It.IsAny())) - .Returns(() => - { - Console.WriteLine($"WriteForbiddenThrown: {physicalUri}"); - throw new ServiceUnavailableException($"Mock write forbidden exception on URI:{physicalUri}", physicalUri.Uri); - }); + .Returns(() => throw new ServiceUnavailableException($"Mock write forbidden exception on URI:{physicalUri}", physicalUri.Uri)); } internal static void SetupRequestTimeoutException( @@ -270,11 +262,7 @@ internal static void SetupRequestTimeoutException( TransportAddressUri physicalUri) { mockTransportClient.Setup(x => x.InvokeResourceOperationAsync(physicalUri, It.IsAny())) - .Returns(() => - { - Console.WriteLine($"RequestTimeoutThrown: {physicalUri}"); - throw new RequestTimeoutException($"Mock request timeout exception on URI:{physicalUri}", physicalUri.Uri); - }); + .Returns(() => throw new RequestTimeoutException($"Mock request timeout exception on URI:{physicalUri}", physicalUri.Uri)); } internal static void SetupCreateItemResponse( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/RegionFailoverTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/RegionFailoverTests.cs index 519681628b..70cf0950c0 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/RegionFailoverTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/PartitionKeyRangeFailoverTests/RegionFailoverTests.cs @@ -25,7 +25,7 @@ public async Task TestHttpRequestExceptionScenarioAsync() // testhost.dll.config sets it to 2 seconds which causes it to always expire before retrying. Remove the override. System.Configuration.ConfigurationManager.AppSettings["UnavailableLocationsExpirationTimeInSeconds"] = "500"; - string accountName = "testAccount"; + string accountName = nameof(TestHttpRequestExceptionScenarioAsync); string primaryRegionNameForUri = "eastus"; string secondaryRegionNameForUri = "westus"; string globalEndpoint = $"https://{accountName}.documents.azure.com:443/"; @@ -95,13 +95,13 @@ public async Task TestHttpRequestExceptionScenarioAsync() int count = 0; mockHttpHandler.Setup(x => x.SendAsync( - It.Is(x => x.RequestUri == new Uri(secondaryRegionEndpiont)), + It.Is(x => x.RequestUri == new Uri(secondaryRegionEndpiont)), It.IsAny())) .Returns((request, cancellationToken) => { // Simulate the legacy gateway being down. After 40 requests simulate the write region pointing to new location. count++; - if(count < 2) + if (count < 2) { return Task.FromResult(MockSetupsHelper.CreateStrongAccount(accountName, writeRegion, readRegions)); } @@ -109,7 +109,7 @@ public async Task TestHttpRequestExceptionScenarioAsync() { return Task.FromResult(MockSetupsHelper.CreateStrongAccount(accountName, writeRegionFailedOver, readRegionsFailedOver)); } - }); + }); MockSetupsHelper.SetupContainerProperties( @@ -167,43 +167,47 @@ public async Task TestHttpRequestExceptionScenarioAsync() TransportClientHandlerFactory = (original) => mockTransport.Object, }; - CosmosClient customClient = new CosmosClient( + using (CosmosClient customClient = new CosmosClient( globalEndpoint, Convert.ToBase64String(Encoding.UTF8.GetBytes(Guid.NewGuid().ToString())), - cosmosClientOptions); + cosmosClientOptions)) + { + Container container = customClient.GetContainer(databaseName, containerName); - Container container = customClient.GetContainer(databaseName, containerName); + ToDoActivity toDoActivity = new ToDoActivity() + { + Id = "TestItem", + Pk = "TestPk" + }; - ToDoActivity toDoActivity = new ToDoActivity() - { - Id = "TestItem", - Pk = "TestPk" - }; + ItemResponse response = await container.CreateItemAsync(toDoActivity, new Cosmos.PartitionKey(toDoActivity.Pk)); + Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + mockTransport.VerifyAll(); + mockHttpHandler.VerifyAll(); - ItemResponse response = await container.CreateItemAsync(toDoActivity, new Cosmos.PartitionKey(toDoActivity.Pk)); - Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); - mockTransport.VerifyAll(); - mockHttpHandler.VerifyAll(); + // Clears all the setups. No network calls should be done on the next operation. + mockHttpHandler.Reset(); + mockTransport.Reset(); - // Clears all the setups. No network calls should be done on the next operation. - mockHttpHandler.Reset(); - mockTransport.Reset(); + MockSetupsHelper.SetupCreateItemResponse( + mockTransport, + secondaryRegionPrimaryReplicaUri); - MockSetupsHelper.SetupCreateItemResponse( - mockTransport, - secondaryRegionPrimaryReplicaUri); + ToDoActivity toDoActivity2 = new ToDoActivity() + { + Id = "TestItem2", + Pk = "TestPk" + }; - ToDoActivity toDoActivity2 = new ToDoActivity() - { - Id = "TestItem2", - Pk = "TestPk" - }; + response = await container.CreateItemAsync(toDoActivity2, new Cosmos.PartitionKey(toDoActivity2.Pk)); + Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + mockTransport.Setup(x => x.Dispose()); - response = await container.CreateItemAsync(toDoActivity2, new Cosmos.PartitionKey(toDoActivity2.Pk)); - Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + // Reset it back to the override to avoid impacting other tests. + System.Configuration.ConfigurationManager.AppSettings["UnavailableLocationsExpirationTimeInSeconds"] = "2"; + } - // Reset it back to the override to avoid impacting other tests. - System.Configuration.ConfigurationManager.AppSettings["UnavailableLocationsExpirationTimeInSeconds"] = "2"; + await Task.Delay(TimeSpan.FromMinutes(2)); } } }