Skip to content

Commit

Permalink
Session: Removes Global Session Token on GW request if we can't resol…
Browse files Browse the repository at this point in the history
…ve the scoped session t… (#2975)

The .NET SDK is sending the global session token if it does not have the partition key range id in the session token cache. This can cause the request to fail from the header being too large. Once the session token is added to the session token cache it is properly filtered for future requests. This impacts both point operations and query for the first operation going to that partition key range id.

Solution:
If there is no session token for the specificied partition key range id or the parent range ids then no session token should be sent because it is the first request to that partition range id.
  • Loading branch information
asketagarwal authored Jan 14, 2022
1 parent 03041f6 commit 67cbd89
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 45 deletions.
7 changes: 1 addition & 6 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,7 @@ internal static async Task ApplySessionTokenAsync(
partitionKeyRangeCache,
clientCollectionCache);

if (!isSuccess)
{
sessionToken = sessionContainer.ResolveGlobalSessionToken(request);
}

if (!string.IsNullOrEmpty(sessionToken))
if (isSuccess && !string.IsNullOrEmpty(sessionToken))
{
request.Headers[HttpConstants.HttpHeaders.SessionToken] = sessionToken;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Newtonsoft.Json;
using System.Collections.ObjectModel;

/// <summary>
/// Tests for <see cref="GatewayStoreModel"/>.
Expand Down Expand Up @@ -253,8 +254,7 @@ public async Task TestApplySessionForDataOperation(bool multiMaster)
List<ResourceType> resourceTypes = new List<ResourceType>()
{
ResourceType.Document,
ResourceType.Conflict,
ResourceType.Batch
ResourceType.Conflict
};

List<OperationType> operationTypes = new List<OperationType>()
Expand All @@ -263,7 +263,8 @@ public async Task TestApplySessionForDataOperation(bool multiMaster)
OperationType.Delete,
OperationType.Read,
OperationType.Upsert,
OperationType.Replace
OperationType.Replace,
OperationType.Batch
};

foreach (ResourceType resourceType in resourceTypes)
Expand Down Expand Up @@ -301,31 +302,33 @@ await GatewayStoreModel.ApplySessionTokenAsync(

{
// Verify when user does not set session token
DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.CreateFromName(
operationType,
"Test",
resourceType,
AuthorizationTokenType.PrimaryMasterKey);
DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.Create(operationType,
resourceType,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey);

string dsrSessionToken = Guid.NewGuid().ToString();
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken);
SessionContainer sessionContainer = new SessionContainer(string.Empty);
sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

Mock<IGlobalEndpointManager> globalEndpointManager = new Mock<IGlobalEndpointManager>();
globalEndpointManager.Setup(gem => gem.CanUseMultipleWriteLocations(It.Is<DocumentServiceRequest>(drs => drs == dsrNoSessionToken))).Returns(multiMaster);
await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) =>
{
dsrNoSessionToken.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_1" };
await GatewayStoreModel.ApplySessionTokenAsync(
dsrNoSessionToken,
ConsistencyLevel.Session,
sMock.Object,
sessionContainer,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: globalEndpointManager.Object);
if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch || multiMaster)
{
Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
Assert.AreEqual("range_1:1#9#4=8#5=7", dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
else
{
Expand All @@ -336,18 +339,19 @@ await GatewayStoreModel.ApplySessionTokenAsync(

{
// Verify when partition key range is configured
DocumentServiceRequest dsr = DocumentServiceRequest.CreateFromName(
operationType,
"Test",
resourceType,
AuthorizationTokenType.PrimaryMasterKey);

string partitionKeyRangeId = "1";
dsr.Headers[WFConstants.BackendHeaders.PartitionKeyRangeId] = new PartitionKeyRangeIdentity(partitionKeyRangeId).ToHeader();

string dsrSessionToken = Guid.NewGuid().ToString();
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsr)).Returns(dsrSessionToken);
string partitionKeyRangeId = "range_1";
DocumentServiceRequest dsr = DocumentServiceRequest.Create(operationType,
resourceType,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey,
new StoreRequestNameValueCollection() { { WFConstants.BackendHeaders.PartitionKeyRangeId, new PartitionKeyRangeIdentity(partitionKeyRangeId).ToHeader() } });


SessionContainer sessionContainer = new SessionContainer(string.Empty);
sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

ContainerProperties containerProperties = ContainerProperties.CreateWithResourceId("ccZ1ANCszwk=");
containerProperties.Id = "TestId";
Expand All @@ -364,19 +368,19 @@ await GatewayStoreModel.ApplySessionTokenAsync(
containerProperties.ResourceId,
partitionKeyRangeId,
It.IsAny<ITrace>(),
false)).Returns(Task.FromResult(new PartitionKeyRange()));
false)).Returns(Task.FromResult(new PartitionKeyRange { Id = "range_1" }));

await GatewayStoreModel.ApplySessionTokenAsync(
dsr,
ConsistencyLevel.Session,
sMock.Object,
sessionContainer,
partitionKeyRangeCache: mockPartitionKeyRangeCache.Object,
clientCollectionCache: mockCollectionCahce.Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

if (dsr.IsReadOnlyRequest || dsr.OperationType == OperationType.Batch)
{
Assert.AreEqual(dsrSessionToken, dsr.Headers[HttpConstants.HttpHeaders.SessionToken]);
Assert.AreEqual("range_1:1#9#4=8#5=7", dsr.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
else
{
Expand Down Expand Up @@ -424,33 +428,35 @@ public async Task TestRequestOverloadRemovesSessionToken(bool multiMaster, bool
INameValueCollection headers = new StoreRequestNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Eventual.ToString());

DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.CreateFromName(
isWriteRequest ? OperationType.Create : OperationType.Read,
"Test",
ResourceType.Document,
AuthorizationTokenType.PrimaryMasterKey,
headers);
DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.Create(isWriteRequest ? OperationType.Create : OperationType.Read,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1/docs/doc1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey,
headers);

string dsrSessionToken = Guid.NewGuid().ToString();
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken);
SessionContainer sessionContainer = new SessionContainer(string.Empty);
sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

Mock<IGlobalEndpointManager> globalEndpointManager = new Mock<IGlobalEndpointManager>();
globalEndpointManager.Setup(gem => gem.CanUseMultipleWriteLocations(It.Is<DocumentServiceRequest>(drs => drs == dsrNoSessionToken))).Returns(multiMaster);
await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) =>
{
dsrNoSessionToken.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_1" };
await GatewayStoreModel.ApplySessionTokenAsync(
dsrNoSessionToken,
ConsistencyLevel.Session,
sMock.Object,
sessionContainer,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: globalEndpointManager.Object);
if (isWriteRequest && multiMaster)
{
// Multi master write requests should not lower the consistency and remove the session token
Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
Assert.AreEqual("range_1:1#9#4=8#5=7", dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
else
{
Expand Down Expand Up @@ -881,6 +887,83 @@ private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownR
}
}

[TestMethod]
[Owner("askagarw")]
public async Task GatewayStoreModel_AvoidGlobalSessionToken()
{
Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));
using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
using GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
ConsistencyLevel.Session,
eventSource,
null,
MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient()));
Mock<ClientCollectionCache> clientCollectionCache = new Mock<ClientCollectionCache>(new SessionContainer("testhost"), storeModel, null, null);
Mock<PartitionKeyRangeCache> partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(null, storeModel, clientCollectionCache.Object);

sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

using (DocumentServiceRequest dsr = DocumentServiceRequest.Create(OperationType.Query,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey))
{
// pkrange 1 : which has session token
dsr.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_1" };
await GatewayStoreModel.ApplySessionTokenAsync(dsr,
ConsistencyLevel.Session,
sessionContainer,
partitionKeyRangeCache.Object,
clientCollectionCache.Object,
endpointManager);
Assert.AreEqual(dsr.Headers[HttpConstants.HttpHeaders.SessionToken], "range_1:1#9#4=8#5=7");
}

using (DocumentServiceRequest dsr = DocumentServiceRequest.Create(OperationType.Query,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey))
{
// pkrange 2 : which has no session token
dsr.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_2" };
await GatewayStoreModel.ApplySessionTokenAsync(dsr,
ConsistencyLevel.Session,
sessionContainer,
partitionKeyRangeCache.Object,
clientCollectionCache.Object,
endpointManager);
Assert.AreEqual(dsr.Headers[HttpConstants.HttpHeaders.SessionToken], null);

// There exists global session token, but we do not use it
Assert.AreEqual(sessionContainer.ResolveGlobalSessionToken(dsr), "range_1:1#9#4=8#5=7");
}

using (DocumentServiceRequest dsr = DocumentServiceRequest.Create(OperationType.Query,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey))
{
// pkrange 3 : Split scenario where session token exists for parent of pk range
Collection<string> parents = new Collection<string> { "range_1" };
dsr.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_3", Parents = parents };
await GatewayStoreModel.ApplySessionTokenAsync(dsr,
ConsistencyLevel.Session,
sessionContainer,
partitionKeyRangeCache.Object,
clientCollectionCache.Object,
endpointManager);
Assert.AreEqual(dsr.Headers[HttpConstants.HttpHeaders.SessionToken], "range_3:1#9#4=8#5=7");
}
}

/// <summary>
/// When the response contains a PKRangeId header different than the one targeted with the session token, trigger a refresh of the PKRange cache
/// </summary>
Expand Down Expand Up @@ -1023,6 +1106,7 @@ private async Task TestGatewayStoreModelProcessMessageAsync(GatewayStoreModel st
await storeModel.ProcessMessageAsync(request);
request.Headers.Remove("x-ms-session-token");
request.Headers["x-ms-consistency-level"] = "Session";
request.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_0" };
await storeModel.ProcessMessageAsync(request);
}
}
Expand Down

0 comments on commit 67cbd89

Please sign in to comment.