Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session: Removes Global Session Token on GW request if we can't resolve the scoped session t… #2975

Merged
merged 3 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,7 @@ internal static async Task ApplySessionTokenAsync(
partitionKeyRangeCache,
clientCollectionCache);

if (!isSuccess)
{
sessionToken = sessionContainer.ResolveGlobalSessionToken(request);
}

if (!string.IsNullOrEmpty(sessionToken))
if (isSuccess && !string.IsNullOrEmpty(sessionToken))
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
request.Headers[HttpConstants.HttpHeaders.SessionToken] = sessionToken;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Tests;
using Microsoft.Azure.Cosmos.Tracing;
using Newtonsoft.Json;
using System.Collections.ObjectModel;

/// <summary>
/// Tests for <see cref="GatewayStoreModel"/>.
Expand Down Expand Up @@ -253,8 +254,7 @@ public async Task TestApplySessionForDataOperation(bool multiMaster)
List<ResourceType> resourceTypes = new List<ResourceType>()
{
ResourceType.Document,
ResourceType.Conflict,
ResourceType.Batch
ResourceType.Conflict
};

List<OperationType> operationTypes = new List<OperationType>()
Expand All @@ -263,7 +263,8 @@ public async Task TestApplySessionForDataOperation(bool multiMaster)
OperationType.Delete,
OperationType.Read,
OperationType.Upsert,
OperationType.Replace
OperationType.Replace,
OperationType.Batch
};

foreach (ResourceType resourceType in resourceTypes)
Expand Down Expand Up @@ -301,31 +302,33 @@ await GatewayStoreModel.ApplySessionTokenAsync(

{
// Verify when user does not set session token
DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.CreateFromName(
operationType,
"Test",
resourceType,
AuthorizationTokenType.PrimaryMasterKey);
DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.Create(operationType,
resourceType,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey);

string dsrSessionToken = Guid.NewGuid().ToString();
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken);
SessionContainer sessionContainer = new SessionContainer(string.Empty);
sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

Mock<IGlobalEndpointManager> globalEndpointManager = new Mock<IGlobalEndpointManager>();
globalEndpointManager.Setup(gem => gem.CanUseMultipleWriteLocations(It.Is<DocumentServiceRequest>(drs => drs == dsrNoSessionToken))).Returns(multiMaster);
await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) =>
{
dsrNoSessionToken.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_1" };
await GatewayStoreModel.ApplySessionTokenAsync(
dsrNoSessionToken,
ConsistencyLevel.Session,
sMock.Object,
sessionContainer,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: globalEndpointManager.Object);

if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch || multiMaster)
{
Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
Assert.AreEqual("range_1:1#9#4=8#5=7", dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
else
{
Expand All @@ -336,18 +339,19 @@ await GatewayStoreModel.ApplySessionTokenAsync(

{
// Verify when partition key range is configured
DocumentServiceRequest dsr = DocumentServiceRequest.CreateFromName(
operationType,
"Test",
resourceType,
AuthorizationTokenType.PrimaryMasterKey);

string partitionKeyRangeId = "1";
dsr.Headers[WFConstants.BackendHeaders.PartitionKeyRangeId] = new PartitionKeyRangeIdentity(partitionKeyRangeId).ToHeader();

string dsrSessionToken = Guid.NewGuid().ToString();
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsr)).Returns(dsrSessionToken);
string partitionKeyRangeId = "range_1";
DocumentServiceRequest dsr = DocumentServiceRequest.Create(operationType,
resourceType,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey,
new StoreRequestNameValueCollection() { { WFConstants.BackendHeaders.PartitionKeyRangeId, new PartitionKeyRangeIdentity(partitionKeyRangeId).ToHeader() } });


SessionContainer sessionContainer = new SessionContainer(string.Empty);
sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

ContainerProperties containerProperties = ContainerProperties.CreateWithResourceId("ccZ1ANCszwk=");
containerProperties.Id = "TestId";
Expand All @@ -364,19 +368,19 @@ await GatewayStoreModel.ApplySessionTokenAsync(
containerProperties.ResourceId,
partitionKeyRangeId,
It.IsAny<ITrace>(),
false)).Returns(Task.FromResult(new PartitionKeyRange()));
false)).Returns(Task.FromResult(new PartitionKeyRange { Id = "range_1" }));

await GatewayStoreModel.ApplySessionTokenAsync(
dsr,
ConsistencyLevel.Session,
sMock.Object,
sessionContainer,
partitionKeyRangeCache: mockPartitionKeyRangeCache.Object,
clientCollectionCache: mockCollectionCahce.Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

if (dsr.IsReadOnlyRequest || dsr.OperationType == OperationType.Batch)
{
Assert.AreEqual(dsrSessionToken, dsr.Headers[HttpConstants.HttpHeaders.SessionToken]);
Assert.AreEqual("range_1:1#9#4=8#5=7", dsr.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
else
{
Expand Down Expand Up @@ -424,33 +428,35 @@ public async Task TestRequestOverloadRemovesSessionToken(bool multiMaster, bool
INameValueCollection headers = new StoreRequestNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.ConsistencyLevel, ConsistencyLevel.Eventual.ToString());

DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.CreateFromName(
isWriteRequest ? OperationType.Create : OperationType.Read,
"Test",
ResourceType.Document,
AuthorizationTokenType.PrimaryMasterKey,
headers);
DocumentServiceRequest dsrNoSessionToken = DocumentServiceRequest.Create(isWriteRequest ? OperationType.Create : OperationType.Read,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1/docs/doc1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey,
headers);

string dsrSessionToken = Guid.NewGuid().ToString();
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken);
SessionContainer sessionContainer = new SessionContainer(string.Empty);
sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

Mock<IGlobalEndpointManager> globalEndpointManager = new Mock<IGlobalEndpointManager>();
globalEndpointManager.Setup(gem => gem.CanUseMultipleWriteLocations(It.Is<DocumentServiceRequest>(drs => drs == dsrNoSessionToken))).Returns(multiMaster);
await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) =>
{
dsrNoSessionToken.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_1" };
await GatewayStoreModel.ApplySessionTokenAsync(
dsrNoSessionToken,
ConsistencyLevel.Session,
sMock.Object,
sessionContainer,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: globalEndpointManager.Object);

if (isWriteRequest && multiMaster)
{
// Multi master write requests should not lower the consistency and remove the session token
Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
Assert.AreEqual("range_1:1#9#4=8#5=7", dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
else
{
Expand Down Expand Up @@ -881,6 +887,83 @@ private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownR
}
}

[TestMethod]
[Owner("askagarw")]
public async Task GatewayStoreModel_AvoidGlobalSessionToken()
{
Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));
using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
using GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
ConsistencyLevel.Session,
eventSource,
null,
MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient()));
Mock<ClientCollectionCache> clientCollectionCache = new Mock<ClientCollectionCache>(new SessionContainer("testhost"), storeModel, null, null);
Mock<PartitionKeyRangeCache> partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(null, storeModel, clientCollectionCache.Object);

sessionContainer.SetSessionToken(
ResourceId.NewDocumentCollectionId(42, 129).DocumentCollectionId.ToString(),
"dbs/db1/colls/coll1",
new StoreRequestNameValueCollection() { { HttpConstants.HttpHeaders.SessionToken, "range_1:1#9#4=8#5=7" } });

using (DocumentServiceRequest dsr = DocumentServiceRequest.Create(OperationType.Query,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey))
{
// pkrange 1 : which has session token
dsr.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_1" };
await GatewayStoreModel.ApplySessionTokenAsync(dsr,
ConsistencyLevel.Session,
sessionContainer,
partitionKeyRangeCache.Object,
clientCollectionCache.Object,
endpointManager);
Assert.AreEqual(dsr.Headers[HttpConstants.HttpHeaders.SessionToken], "range_1:1#9#4=8#5=7");
}

using (DocumentServiceRequest dsr = DocumentServiceRequest.Create(OperationType.Query,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey))
{
// pkrange 2 : which has no session token
dsr.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_2" };
await GatewayStoreModel.ApplySessionTokenAsync(dsr,
ConsistencyLevel.Session,
sessionContainer,
partitionKeyRangeCache.Object,
clientCollectionCache.Object,
endpointManager);
Assert.AreEqual(dsr.Headers[HttpConstants.HttpHeaders.SessionToken], null);

// There exists global session token, but we do not use it
Assert.AreEqual(sessionContainer.ResolveGlobalSessionToken(dsr), "range_1:1#9#4=8#5=7");
}

using (DocumentServiceRequest dsr = DocumentServiceRequest.Create(OperationType.Query,
ResourceType.Document,
new Uri("https://foo.com/dbs/db1/colls/coll1", UriKind.Absolute),
AuthorizationTokenType.PrimaryMasterKey))
{
// pkrange 3 : Split scenario where session token exists for parent of pk range
Collection<string> parents = new Collection<string> { "range_1" };
dsr.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_3", Parents = parents };
await GatewayStoreModel.ApplySessionTokenAsync(dsr,
ConsistencyLevel.Session,
sessionContainer,
partitionKeyRangeCache.Object,
clientCollectionCache.Object,
endpointManager);
Assert.AreEqual(dsr.Headers[HttpConstants.HttpHeaders.SessionToken], "range_3:1#9#4=8#5=7");
}
}

/// <summary>
/// When the response contains a PKRangeId header different than the one targeted with the session token, trigger a refresh of the PKRange cache
/// </summary>
Expand Down Expand Up @@ -1023,6 +1106,7 @@ private async Task TestGatewayStoreModelProcessMessageAsync(GatewayStoreModel st
await storeModel.ProcessMessageAsync(request);
request.Headers.Remove("x-ms-session-token");
request.Headers["x-ms-consistency-level"] = "Session";
request.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange { Id = "range_0" };
await storeModel.ProcessMessageAsync(request);
}
}
Expand Down