From 1123290246bf659cb6543fbe515a82bf0a276751 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 22 Mar 2022 01:32:17 +0000 Subject: [PATCH 1/2] Fixes a bug causing session tokens for PKRangeId 0 to be overridden by session token for another partition on upserts --- .../src/Handler/TransportHandler.cs | 11 +- .../CosmosItemSessionTokenTests.cs | 286 ++++++++++++++++++ .../Utils/TransportClientHelper.cs | 8 +- 3 files changed, 294 insertions(+), 11 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs diff --git a/Microsoft.Azure.Cosmos/src/Handler/TransportHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/TransportHandler.cs index d17d74faf1..f791d173c4 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/TransportHandler.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/TransportHandler.cs @@ -119,9 +119,7 @@ internal async Task ProcessMessageAsync( DocumentServiceResponse response = null; try { - response = request.OperationType == OperationType.Upsert - ? await this.ProcessUpsertAsync(storeProxy, serviceRequest, cancellationToken) - : await storeProxy.ProcessMessageAsync(serviceRequest, cancellationToken); + response = await storeProxy.ProcessMessageAsync(serviceRequest, cancellationToken); } finally { @@ -151,12 +149,5 @@ internal static ResponseMessage AggregateExceptionConverter(AggregateException a return null; } - - private async Task ProcessUpsertAsync(IStoreModel storeProxy, DocumentServiceRequest serviceRequest, CancellationToken cancellationToken) - { - DocumentServiceResponse response = await storeProxy.ProcessMessageAsync(serviceRequest, cancellationToken); - this.client.DocumentClient.CaptureSessionToken(serviceRequest, response); - return response; - } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs new file mode 100644 index 0000000000..8d96d2b8ec --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs @@ -0,0 +1,286 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Globalization; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Cosmos; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using PartitionKey = Documents.PartitionKey; + using Microsoft.Azure.Cosmos.Diagnostics; + using System.IO; + using System.Net; + + [TestClass] + public class CosmosItemSessionTokenTests : BaseCosmosClientHelper + { + private Container Container = null; + private ContainerProperties containerSettings = null; + + [TestInitialize] + public async Task TestInitialize() + { + await base.TestInit(validateSinglePartitionKeyRangeCacheCall: true); + string PartitionKey = "/pk"; + this.containerSettings = new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey); + ContainerResponse response = await this.database.CreateContainerAsync( + this.containerSettings, + cancellationToken: this.cancellationToken); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Container); + Assert.IsNotNull(response.Resource); + this.Container = response; + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + [TestMethod] + public async Task CreateDropItemTest() + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + ItemResponse response = await this.Container.CreateItemAsync(item: testItem); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Resource); + Assert.IsNotNull(response.Diagnostics); + long? lsnAfterCreate = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterCreate); + CosmosTraceDiagnostics diagnostics = (CosmosTraceDiagnostics)response.Diagnostics; + Assert.IsFalse(diagnostics.IsGoneExceptionHit()); + Assert.IsFalse(string.IsNullOrEmpty(diagnostics.ToString())); + Assert.IsTrue(diagnostics.GetClientElapsedTime() > TimeSpan.Zero); + + response = await this.Container.ReadItemAsync(testItem.id, new Cosmos.PartitionKey(testItem.pk)); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Resource); + Assert.IsNotNull(response.Diagnostics); + Assert.IsFalse(string.IsNullOrEmpty(response.Diagnostics.ToString())); + Assert.IsTrue(response.Diagnostics.GetClientElapsedTime() > TimeSpan.Zero); + long? lsnAfterRead = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterRead); + Assert.AreEqual(lsnAfterCreate.Value, lsnAfterRead.Value); + + Assert.IsNotNull(response.Headers.GetHeaderValue(Documents.HttpConstants.HttpHeaders.MaxResourceQuota)); + Assert.IsNotNull(response.Headers.GetHeaderValue(Documents.HttpConstants.HttpHeaders.CurrentResourceQuotaUsage)); + ItemResponse deleteResponse = await this.Container.DeleteItemAsync(partitionKey: new Cosmos.PartitionKey(testItem.pk), id: testItem.id); + Assert.IsNotNull(deleteResponse); + Assert.IsNotNull(response.Diagnostics); + Assert.IsFalse(string.IsNullOrEmpty(response.Diagnostics.ToString())); + Assert.IsTrue(response.Diagnostics.GetClientElapsedTime() > TimeSpan.Zero); + long? lsnAfterDelete = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterDelete); + Assert.IsTrue(lsnAfterDelete.Value > lsnAfterCreate.Value); + } + + [TestMethod] + public async Task ReplaceItemStreamTest() + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + using (Stream stream = TestCommon.SerializerCore.ToStream(testItem)) + { + //Create the item + using (ResponseMessage response = await this.Container.CreateItemStreamAsync(partitionKey: new Cosmos.PartitionKey(testItem.pk), streamPayload: stream)) + { + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + } + } + + long? lsnAfterCreate = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterCreate); + + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + using (Stream stream = TestCommon.SerializerCore.ToStream(testItem)) + { + //Replace a non-existing item. It should fail, and not throw an exception. + using (ResponseMessage response = await this.Container.ReplaceItemStreamAsync( + partitionKey: new Cosmos.PartitionKey("SomeNonExistingId"), + id: "SomeNonExistingId", + streamPayload: stream)) + { + Assert.IsFalse(response.IsSuccessStatusCode); + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.NotFound, response.StatusCode, response.ErrorMessage); + + // Session token should be captured for NotFound with SubStatusCode 0 + long? lsnAfterNotFound = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterNotFound); + Assert.AreEqual(lsnAfterCreate.Value, lsnAfterNotFound.Value); + } + } + + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + //Updated the taskNum field + testItem.taskNum = 9001; + using (Stream stream = TestCommon.SerializerCore.ToStream(testItem)) + { + using (ResponseMessage response = await this.Container.ReplaceItemStreamAsync(partitionKey: new Cosmos.PartitionKey(testItem.pk), id: testItem.id, streamPayload: stream)) + { + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + } + + long? lsnAfterReplace= await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterReplace); + Assert.IsTrue(lsnAfterReplace.Value > lsnAfterCreate.Value); + + using (ResponseMessage deleteResponse = await this.Container.DeleteItemStreamAsync(partitionKey: new Cosmos.PartitionKey(testItem.pk), id: testItem.id)) + { + Assert.IsNotNull(deleteResponse); + Assert.AreEqual(deleteResponse.StatusCode, HttpStatusCode.NoContent); + } + + long? lsnAfterDelete = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterDelete); + Assert.IsTrue(lsnAfterDelete.Value > lsnAfterReplace.Value); + } + } + + [TestMethod] + public async Task UpsertItemTest() + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + ItemResponse response = await this.Container.UpsertItemAsync(testItem, partitionKey: new Cosmos.PartitionKey(testItem.pk)); + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.Created, response.StatusCode); + Assert.IsNotNull(response.Headers.Session); + long? lsnAfterCreate = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterCreate); + + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + //Updated the taskNum field + testItem.taskNum = 9001; + response = await this.Container.UpsertItemAsync(testItem, partitionKey: new Cosmos.PartitionKey(testItem.pk)); + + Assert.IsNotNull(response); + Assert.AreEqual(HttpStatusCode.OK, response.StatusCode); + Assert.IsNotNull(response.Headers.Session); + + long? lsnAfterUpsert = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterUpsert); + Assert.IsTrue(lsnAfterUpsert.Value > lsnAfterCreate.Value); + } + + [TestMethod] + public async Task NoSessionTokenCaptureForThrottledUpsertRequestsTest() + { + ToDoActivity testItem = ToDoActivity.CreateRandomToDoActivity(); + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + ItemResponse response = await this.Container.CreateItemAsync(item: testItem); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Resource); + Assert.IsNotNull(response.Diagnostics); + string sessionTokenHeaderValue = response.Headers[HttpConstants.HttpHeaders.SessionToken]; + Assert.IsNotNull(sessionTokenHeaderValue); + + long? lsnAfterCreate = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNotNull(lsnAfterCreate); + + this.ResetSessionToken(); + Assert.IsNull(await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk))); + + Container throttledContainer = TransportClientHelper.GetContainerWithIntercepter( + this.database.Id, + this.Container.Id, + (uri, resourceOperation, documentServiceRequest) => { }, + useGatewayMode: false, + (uri, resourceOperation, documentServiceRequest) => + { + StoreResponse throttledResponse = TransportClientHelper.ReturnThrottledStoreResponseOnItemOperation( + uri, resourceOperation, documentServiceRequest, Guid.NewGuid(), string.Empty); + + throttledResponse.Headers.Add(HttpConstants.HttpHeaders.SessionToken, sessionTokenHeaderValue); + + return throttledResponse; + }, + this.cosmosClient.DocumentClient.sessionContainer); + + try + { + //Updated the taskNum field + testItem.taskNum = 9001; + response = await throttledContainer.UpsertItemAsync(testItem, partitionKey: new Cosmos.PartitionKey(testItem.pk)); + } + catch (CosmosException cosmosException) + { + Assert.AreEqual(HttpStatusCode.TooManyRequests, cosmosException.StatusCode); + } + + long? lsnAfterThrottledRequest = await this.GetLSNFromSessionContainer(new PartitionKey(testItem.pk)); + Assert.IsNull(lsnAfterThrottledRequest); + } + + private async Task GetPKRangeIdForPartitionKey( + PartitionKey pkValue) + { + DocumentFeedResponse pkRanges = await this.cosmosClient.DocumentClient.ReadPartitionKeyRangeFeedAsync( + UriFactory.CreateDocumentCollectionUri(this.Container.Database.Id, this.Container.Id)); + List maxExclusiveBoundaries = pkRanges.Select(pkRange => pkRange.MaxExclusive).ToList(); + + string effectivePK1 = pkValue.InternalKey.GetEffectivePartitionKeyString(this.containerSettings.PartitionKey); + int pkIndex = 0; + while (pkIndex < maxExclusiveBoundaries.Count && string.Compare(effectivePK1, maxExclusiveBoundaries[pkIndex]) >= 0) + { + ++pkIndex; + } + + if (pkIndex == maxExclusiveBoundaries.Count) + { + throw new Exception("Failed to find the range"); + } + + return pkIndex.ToString(CultureInfo.InvariantCulture); + } + + private async Task> GetLSNFromSessionContainer(PartitionKey pkValue) + { + string path = $"dbs/{this.Container.Database.Id}/colls/{this.Container.Id}"; + string pkRangeId = await this.GetPKRangeIdForPartitionKey(pkValue); + DocumentServiceRequest dummyRequest = new DocumentServiceRequest( + OperationType.Read, + ResourceType.Document, + path, + body: null, + AuthorizationTokenType.PrimaryMasterKey, + headers: null); + + ISessionToken sessionToken = this.cosmosClient.DocumentClient.sessionContainer.ResolvePartitionLocalSessionToken( + dummyRequest, + pkRangeId); + + return sessionToken?.LSN; + } + + private void ResetSessionToken() + { + string path = $"dbs/{this.Container.Database.Id}/colls/{this.Container.Id}"; + this.cosmosClient.DocumentClient.sessionContainer.ClearTokenByCollectionFullname(path); + + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TransportClientHelper.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TransportClientHelper.cs index a302b518f0..162450b034 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TransportClientHelper.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TransportClientHelper.cs @@ -59,7 +59,8 @@ internal static Container GetContainerWithIntercepter( string containerId, Action interceptor, bool useGatewayMode = false, - Func interceptorWithStoreResult = null) + Func interceptorWithStoreResult = null, + ISessionContainer sessionContainer = null) { CosmosClient clientWithIntercepter = TestCommon.CreateCosmosClient( builder => @@ -69,6 +70,11 @@ internal static Container GetContainerWithIntercepter( builder.WithConnectionModeGateway(); } + if (sessionContainer != null) + { + builder.WithSessionContainer(sessionContainer); + } + builder.WithTransportClientHandlerFactory(transportClient => new TransportClientWrapper( transportClient, interceptor, From 38a3dca1e089970e32706758a1c50b2d6126fd5a Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 22 Mar 2022 14:15:32 +0000 Subject: [PATCH 2/2] Reacted to code review feedback --- .../CosmosItemSessionTokenTests.cs | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs index 8d96d2b8ec..9b77802c01 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosItemSessionTokenTests.cs @@ -5,9 +5,6 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests { using System; - using System.Collections.Generic; - using System.Globalization; - using System.Linq; using System.Threading.Tasks; using Microsoft.Azure.Documents; using Microsoft.Azure.Cosmos; @@ -16,6 +13,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using Microsoft.Azure.Cosmos.Diagnostics; using System.IO; using System.Net; + using System.Threading; + using Microsoft.Azure.Cosmos.Routing; [TestClass] public class CosmosItemSessionTokenTests : BaseCosmosClientHelper @@ -235,26 +234,12 @@ public async Task NoSessionTokenCaptureForThrottledUpsertRequestsTest() Assert.IsNull(lsnAfterThrottledRequest); } - private async Task GetPKRangeIdForPartitionKey( - PartitionKey pkValue) + private async Task GetPKRangeIdForPartitionKey(PartitionKey pkValue) { - DocumentFeedResponse pkRanges = await this.cosmosClient.DocumentClient.ReadPartitionKeyRangeFeedAsync( - UriFactory.CreateDocumentCollectionUri(this.Container.Database.Id, this.Container.Id)); - List maxExclusiveBoundaries = pkRanges.Select(pkRange => pkRange.MaxExclusive).ToList(); + CollectionRoutingMap collectionRoutingMap = await ((ContainerInternal)this.Container).GetRoutingMapAsync(CancellationToken.None); + string effectivePK = pkValue.InternalKey.GetEffectivePartitionKeyString(this.containerSettings.PartitionKey); - string effectivePK1 = pkValue.InternalKey.GetEffectivePartitionKeyString(this.containerSettings.PartitionKey); - int pkIndex = 0; - while (pkIndex < maxExclusiveBoundaries.Count && string.Compare(effectivePK1, maxExclusiveBoundaries[pkIndex]) >= 0) - { - ++pkIndex; - } - - if (pkIndex == maxExclusiveBoundaries.Count) - { - throw new Exception("Failed to find the range"); - } - - return pkIndex.ToString(CultureInfo.InvariantCulture); + return collectionRoutingMap.GetRangeByEffectivePartitionKey(effectivePK).Id; } private async Task> GetLSNFromSessionContainer(PartitionKey pkValue)