diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 932ea0c2d6..dd3c9463d1 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -949,6 +949,8 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFac this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache); this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy); + gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache); + if (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway) { this.StoreModel = this.GatewayStoreModel; diff --git a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs index f6c34d1083..a0ecdfc258 100644 --- a/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs +++ b/Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs @@ -10,10 +10,13 @@ namespace Microsoft.Azure.Cosmos using System.Linq; using System.Net; using System.Net.Http; + using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.Routing; + using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Collections; using Newtonsoft.Json; @@ -28,6 +31,10 @@ internal class GatewayStoreModel : IStoreModel, IDisposable private GatewayStoreClient gatewayStoreClient; + // Caches to resolve the PartitionKeyRange from request. For Session Token Optimization. + private ClientCollectionCache clientCollectionCache; + private PartitionKeyRangeCache partitionKeyRangeCache; + public GatewayStoreModel( GlobalEndpointManager endpointManager, ISessionContainer sessionContainer, @@ -49,10 +56,12 @@ public GatewayStoreModel( public virtual async Task ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default) { - GatewayStoreModel.ApplySessionToken( + await GatewayStoreModel.ApplySessionTokenAsync( request, this.defaultConsistencyLevel, - this.sessionContainer); + this.sessionContainer, + this.partitionKeyRangeCache, + this.clientCollectionCache); DocumentServiceResponse response; try @@ -143,6 +152,13 @@ public virtual async Task GetDatabaseAccountAsync(Func> TryResolveSessionTokenAsync(DocumentServiceRequest request, + ISessionContainer sessionContainer, + PartitionKeyRangeCache partitionKeyRangeCache, + ClientCollectionCache clientCollectionCache) + { + if (request == null) + { + throw new ArgumentNullException(nameof(request)); + } + + if (sessionContainer == null) + { + throw new ArgumentNullException(nameof(sessionContainer)); + } + + if (partitionKeyRangeCache == null) + { + throw new ArgumentNullException(nameof(partitionKeyRangeCache)); + } + + if (clientCollectionCache == null) + { + throw new ArgumentNullException(nameof(clientCollectionCache)); + } + + if (request.ResourceType.IsPartitioned()) + { + (bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(request: request, + sessionContainer: sessionContainer, + partitionKeyRangeCache: partitionKeyRangeCache, + clientCollectionCache: clientCollectionCache, + refreshCache: false); + + if (isSuccess && sessionContainer is SessionContainer gatewaySessionContainer) + { + request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange; + string localSessionToken = gatewaySessionContainer.ResolvePartitionLocalSessionTokenForGateway(request, partitionKeyRange.Id); + if (!string.IsNullOrEmpty(localSessionToken)) + { + return new Tuple(true, localSessionToken); + } + } + } + + return new Tuple(false, null); + } + + private static async Task> TryResolvePartitionKeyRangeAsync(DocumentServiceRequest request, + ISessionContainer sessionContainer, + PartitionKeyRangeCache partitionKeyRangeCache, + ClientCollectionCache clientCollectionCache, + bool refreshCache) + { + if (refreshCache) + { + request.ForceMasterRefresh = true; + request.ForceNameCacheRefresh = true; + } + + PartitionKeyRange partitonKeyRange = null; + ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(request, CancellationToken.None, NoOpTrace.Singleton); + + string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey]; + if (partitionKeyString != null) + { + CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId, + previousValue: null, + request: request, + cancellationToken: CancellationToken.None, + NoOpTrace.Singleton); + + if (refreshCache && collectionRoutingMap != null) + { + collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId, + previousValue: collectionRoutingMap, + request: request, + cancellationToken: CancellationToken.None, + NoOpTrace.Singleton); + } + + partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(request: request, + partitionKeyString: partitionKeyString, + collectionCacheUptoDate: false, + collection: collection, + routingMap: collectionRoutingMap); + } + else if (request.PartitionKeyRangeIdentity != null) + { + PartitionKeyRangeIdentity partitionKeyRangeId = request.PartitionKeyRangeIdentity; + partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(collection.ResourceId, + partitionKeyRangeId.ToString(), + NoOpTrace.Singleton, + refreshCache); + } + + if (partitonKeyRange == null) + { + if (refreshCache) + { + return new Tuple(false, null); + } + + // need to refresh cache. Maybe split happened. + return await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(request: request, + sessionContainer: sessionContainer, + partitionKeyRangeCache: partitionKeyRangeCache, + clientCollectionCache: clientCollectionCache, + refreshCache: true); + } + + return new Tuple(true, partitonKeyRange); + } + // DEVNOTE: This can be replace with ReplicatedResourceClient.IsMasterOperation on next Direct sync internal static bool IsMasterOperation( ResourceType resourceType, diff --git a/Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs b/Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs index a423002b1f..271068302f 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs @@ -452,7 +452,7 @@ private async Task TryResolveServerPartitionAsync( object effectivePartitionKeyStringObject = null; if (partitionKeyString != null) { - range = this.TryResolveServerPartitionByPartitionKey( + range = AddressResolver.TryResolveServerPartitionByPartitionKey( request, partitionKeyString, collectionCacheIsUptodate, @@ -548,7 +548,7 @@ private PartitionKeyRange TryResolveSinglePartitionCollection( // due to parallel usage of V3 SDK and a possible storage or throughput split // The current client might be legacy and not aware of this. // In such case route the request to the first partition - return this.TryResolveServerPartitionByPartitionKey( + return AddressResolver.TryResolveServerPartitionByPartitionKey( request, "[]", // This corresponds to first partition collectionCacheIsUptoDate, @@ -629,7 +629,7 @@ private async Task TryResolveServerPartitionByPartitionKeyRang return new ResolutionResult(partitionKeyRange, addresses, identity); } - private PartitionKeyRange TryResolveServerPartitionByPartitionKey( + internal static PartitionKeyRange TryResolveServerPartitionByPartitionKey( DocumentServiceRequest request, string partitionKeyString, bool collectionCacheUptoDate, diff --git a/Microsoft.Azure.Cosmos/src/SessionContainer.cs b/Microsoft.Azure.Cosmos/src/SessionContainer.cs index 93f1de9fdc..9fbd3f88c8 100644 --- a/Microsoft.Azure.Cosmos/src/SessionContainer.cs +++ b/Microsoft.Azure.Cosmos/src/SessionContainer.cs @@ -49,6 +49,11 @@ public ISessionToken ResolvePartitionLocalSessionToken(DocumentServiceRequest re return SessionContainer.ResolvePartitionLocalSessionToken(this.state, request, partitionKeyRangeId); } + public string ResolvePartitionLocalSessionTokenForGateway(DocumentServiceRequest request, string partitionKeyRangeId) + { + return SessionContainer.ResolvePartitionLocalSessionTokenForGateway(this.state, request, partitionKeyRangeId); + } + public void ClearTokenByCollectionFullname(string collectionFullname) { SessionContainer.ClearTokenByCollectionFullname(this.state, collectionFullname); @@ -139,6 +144,33 @@ private static ISessionToken ResolvePartitionLocalSessionToken(SessionContainerS return SessionTokenHelper.ResolvePartitionLocalSessionToken(request, partitionKeyRangeId, SessionContainer.GetPartitionKeyRangeIdToTokenMap(self, request)); } + private static string ResolvePartitionLocalSessionTokenForGateway(SessionContainerState self, + DocumentServiceRequest request, + string partitionKeyRangeId) + { + ConcurrentDictionary partitionKeyRangeIdToTokenMap = SessionContainer.GetPartitionKeyRangeIdToTokenMap(self, request); + if (partitionKeyRangeIdToTokenMap != null) + { + if (partitionKeyRangeIdToTokenMap.TryGetValue(partitionKeyRangeId, out ISessionToken sessionToken)) + { + return partitionKeyRangeId + ":" + sessionToken.ConvertToString(); + } + else if (request.RequestContext.ResolvedPartitionKeyRange.Parents != null) + { + for (int parentIndex = request.RequestContext.ResolvedPartitionKeyRange.Parents.Count - 1; parentIndex >= 0; parentIndex--) + { + if (partitionKeyRangeIdToTokenMap.TryGetValue(request.RequestContext.ResolvedPartitionKeyRange.Parents[parentIndex], + out sessionToken)) + { + return partitionKeyRangeId + ":" + sessionToken.ConvertToString(); + } + } + } + } + + return null; + } + private static void ClearTokenByCollectionFullname(SessionContainerState self, string collectionFullname) { if (!string.IsNullOrEmpty(collectionFullname)) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/GatewaySessionTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/GatewaySessionTokenTests.cs new file mode 100644 index 0000000000..94b1602675 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/GatewaySessionTokenTests.cs @@ -0,0 +1,144 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Net; + using System.Net.Http; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class GatewaySessionTokenTests : BaseCosmosClientHelper + { + private ContainerInternal Container = null; + private const string PartitionKey = "/pk"; + + [TestInitialize] + public async Task TestInitialize() + { + this.cosmosClient = TestCommon.CreateCosmosClient(useGateway: true); + this.database = await this.cosmosClient.CreateDatabaseAsync( + id: Guid.NewGuid().ToString()); + ContainerResponse response = await this.database.CreateContainerAsync( + new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey), + throughput: 20000, + cancellationToken: this.cancellationToken); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Container); + Assert.IsNotNull(response.Resource); + this.Container = (ContainerInlineCore)response; + + // Create items with different + for (int i = 0; i < 500; i++) + { + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity(); + item.pk = "Status" + i.ToString(); + item.id = i.ToString(); + ItemResponse itemResponse = await this.Container.CreateItemAsync(item); + Assert.AreEqual(HttpStatusCode.Created, itemResponse.StatusCode); + } + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + [TestMethod] + public async Task TestGatewayModelSession() + { + ContainerProperties containerProperties = await this.Container.GetCachedContainerPropertiesAsync(false, NoOpTrace.Singleton, CancellationToken.None); + + ISessionContainer sessionContainer = this.cosmosClient.DocumentClient.sessionContainer; + string docLink = "dbs/" + this.database.Id + "/colls/" + containerProperties.Id + "/docs/3"; + Documents.Collections.INameValueCollection headers = new StoreRequestNameValueCollection(); + headers.Set(HttpConstants.HttpHeaders.PartitionKey, "[\"Status3\"]"); + + DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Read, ResourceType.Document, docLink, AuthorizationTokenType.PrimaryMasterKey, headers); + string globalSessionToken = sessionContainer.ResolveGlobalSessionToken(request); + Assert.IsTrue(globalSessionToken.Split(',').Length > 1); + + await GatewayStoreModel.ApplySessionTokenAsync(request, + Cosmos.ConsistencyLevel.Session, + sessionContainer, + await this.cosmosClient.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton), + await this.cosmosClient.DocumentClient.GetCollectionCacheAsync(NoOpTrace.Singleton)); + + string sessionToken = request.Headers[HttpConstants.HttpHeaders.SessionToken]; + Assert.IsTrue(!string.IsNullOrEmpty(sessionToken) && sessionToken.Split(',').Length == 1); + } + + [TestMethod] + public async Task GatewaySameSessionTokenTest() + { + string createSessionToken = null; + GatewaySessionTokenTests.HttpClientHandlerHelper httpClientHandler = new HttpClientHandlerHelper + { + ResponseCallBack = (result) => + { + HttpResponseMessage response = result.Result; + if (response.StatusCode != HttpStatusCode.Created) + { + return response; + } + + response.Headers.TryGetValues("x-ms-session-token", out IEnumerable sessionTokens); + foreach (string singleToken in sessionTokens) + { + createSessionToken = singleToken; + break; + } + return response; + } + }; + + using (CosmosClient client = TestCommon.CreateCosmosClient(builder => builder + .WithConnectionModeGateway() + .WithConsistencyLevel(Cosmos.ConsistencyLevel.Session) + .WithHttpClientFactory(() => new HttpClient(httpClientHandler)))) + { + Container container = client.GetContainer(this.database.Id, this.Container.Id); + + ToDoActivity item = ToDoActivity.CreateRandomToDoActivity("Status1001", "1001"); + ItemResponse itemResponse = await container.CreateItemAsync(item); + + // Read back the created Item and check if the session token is identical. + string docLink = "dbs/" + this.database.Id + "/colls/" + this.Container.Id + "/docs/1001"; + Documents.Collections.INameValueCollection headers = new StoreRequestNameValueCollection(); + headers.Set(HttpConstants.HttpHeaders.PartitionKey, "[\"Status1001\"]"); + + DocumentServiceRequest request = DocumentServiceRequest.Create(OperationType.Read, ResourceType.Document, docLink, AuthorizationTokenType.PrimaryMasterKey, headers); + await GatewayStoreModel.ApplySessionTokenAsync(request, + Cosmos.ConsistencyLevel.Session, + client.DocumentClient.sessionContainer, + await client.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton), + await client.DocumentClient.GetCollectionCacheAsync(NoOpTrace.Singleton)); + + string readSessionToken = request.Headers[HttpConstants.HttpHeaders.SessionToken]; + Assert.AreEqual(readSessionToken, createSessionToken); + } + } + + private class HttpClientHandlerHelper : DelegatingHandler + { + public HttpClientHandlerHelper() : base(new HttpClientHandler()) + { + } + + public Func, HttpResponseMessage> ResponseCallBack { get; set; } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + return base.SendAsync(request, cancellationToken).ContinueWith(this.ResponseCallBack); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs index 503ee03f81..0bee04db2b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/GatewayStoreModelTest.cs @@ -147,7 +147,7 @@ public async Task TestRetries() } [TestMethod] - public void TestApplySessionForMasterOperation() + public async Task TestApplySessionForMasterOperation() { List resourceTypes = new List() { @@ -190,10 +190,12 @@ public void TestApplySessionForMasterOperation() dsr.Headers.Add(HttpConstants.HttpHeaders.SessionToken, Guid.NewGuid().ToString()); - GatewayStoreModel.ApplySessionToken( + await GatewayStoreModel.ApplySessionTokenAsync( dsr, ConsistencyLevel.Session, - new Mock().Object); + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); Assert.IsNull(dsr.Headers[HttpConstants.HttpHeaders.SessionToken]); } @@ -211,16 +213,19 @@ public void TestApplySessionForMasterOperation() dsrQueryPlan.Headers.Add(HttpConstants.HttpHeaders.SessionToken, Guid.NewGuid().ToString()); - GatewayStoreModel.ApplySessionToken( + await GatewayStoreModel.ApplySessionTokenAsync( dsrQueryPlan, ConsistencyLevel.Session, - new Mock().Object); + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); + Assert.IsNull(dsrQueryPlan.Headers[HttpConstants.HttpHeaders.SessionToken]); } [TestMethod] - public void TestApplySessionForDataOperation() + public async Task TestApplySessionForDataOperation() { List resourceTypes = new List() { @@ -257,10 +262,13 @@ public void TestApplySessionForDataOperation() string dsrSessionToken = Guid.NewGuid().ToString(); dsr.Headers.Add(HttpConstants.HttpHeaders.SessionToken, dsrSessionToken); - GatewayStoreModel.ApplySessionToken( + await GatewayStoreModel.ApplySessionTokenAsync( dsr, ConsistencyLevel.Session, - new Mock().Object); + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); + Assert.AreEqual(dsrSessionToken, dsr.Headers[HttpConstants.HttpHeaders.SessionToken]); @@ -274,12 +282,22 @@ public void TestApplySessionForDataOperation() Mock sMock = new Mock(); sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken); - GatewayStoreModel.ApplySessionToken( + await GatewayStoreModel.ApplySessionTokenAsync( dsrNoSessionToken, ConsistencyLevel.Session, - sMock.Object); + sMock.Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); + - Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); + if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch) + { + Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); + } + else + { + Assert.IsNull(dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]); + } } } @@ -297,10 +315,13 @@ public void TestApplySessionForDataOperation() string sessionToken = Guid.NewGuid().ToString(); dsrSprocExecute.Headers.Add(HttpConstants.HttpHeaders.SessionToken, sessionToken); - GatewayStoreModel.ApplySessionToken( + await GatewayStoreModel.ApplySessionTokenAsync( dsrSprocExecute, ConsistencyLevel.Session, - new Mock().Object); + new Mock().Object, + partitionKeyRangeCache: new Mock(null, null, null).Object, + clientCollectionCache: new Mock(new SessionContainer("testhost"), this.GetGatewayStoreModelForConsistencyTest(), null, null).Object); + Assert.AreEqual(sessionToken, dsrSprocExecute.Headers[HttpConstants.HttpHeaders.SessionToken]); } @@ -751,6 +772,10 @@ private GatewayStoreModel GetGatewayStoreModelForConsistencyTest() null, MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(httpMessageHandler))); + ClientCollectionCache clientCollectionCache = new Mock(new SessionContainer("testhost"), storeModel, null, null).Object; + PartitionKeyRangeCache partitionKeyRangeCache = new Mock(null, storeModel, clientCollectionCache).Object; + storeModel.SetCaches(partitionKeyRangeCache, clientCollectionCache); + return storeModel; }