Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session: Fixes scoped session token on partition splits and multiple write endpoints enabled #2937

Merged
merged 8 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(
this.defaultConsistencyLevel,
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache);
this.clientCollectionCache,
this.endpointManager);

DocumentServiceResponse response;
try
Expand All @@ -80,13 +81,13 @@ await GatewayStoreModel.ApplySessionTokenAsync(
(exception.StatusCode == HttpStatusCode.PreconditionFailed || exception.StatusCode == HttpStatusCode.Conflict
|| (exception.StatusCode == HttpStatusCode.NotFound && exception.GetSubStatus() != SubStatusCodes.ReadSessionNotAvailable)))
{
this.CaptureSessionToken(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers);
await this.CaptureSessionTokenAndHandleSplitAsync(exception.StatusCode, exception.GetSubStatus(), request, exception.Headers);
}

throw;
}

this.CaptureSessionToken(response.StatusCode, response.SubStatusCode, request, response.Headers);
await this.CaptureSessionTokenAndHandleSplitAsync(response.StatusCode, response.SubStatusCode, request, response.Headers);
return response;
}

Expand Down Expand Up @@ -172,7 +173,7 @@ public void Dispose()
this.Dispose(true);
}

private void CaptureSessionToken(
private async Task CaptureSessionTokenAndHandleSplitAsync(
HttpStatusCode? statusCode,
SubStatusCodes subStatusCode,
DocumentServiceRequest request,
Expand Down Expand Up @@ -214,6 +215,20 @@ private void CaptureSessionToken(
else
{
this.sessionContainer.SetSessionToken(request, responseHeaders);
PartitionKeyRange detectedPartitionKeyRange = request.RequestContext.ResolvedPartitionKeyRange;
string partitionKeyRangeInResponse = responseHeaders[HttpConstants.HttpHeaders.PartitionKeyRangeId];
if (detectedPartitionKeyRange != null
&& !string.IsNullOrEmpty(partitionKeyRangeInResponse)
&& !string.IsNullOrEmpty(request.RequestContext.ResolvedCollectionRid)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
&& !partitionKeyRangeInResponse.Equals(detectedPartitionKeyRange.Id, StringComparison.OrdinalIgnoreCase))
{
// The request ended up being on a different partition unknown to the client, so we better refresh the caches
await this.partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(
request.RequestContext.ResolvedCollectionRid,
partitionKeyRangeInResponse,
NoOpTrace.Singleton,
ealsur marked this conversation as resolved.
Show resolved Hide resolved
forceRefresh: true);
}
}
}

Expand All @@ -222,7 +237,8 @@ internal static async Task ApplySessionTokenAsync(
ConsistencyLevel defaultConsistencyLevel,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
CollectionCache clientCollectionCache)
CollectionCache clientCollectionCache,
IGlobalEndpointManager globalEndpointManager)
{
if (request.Headers == null)
{
Expand Down Expand Up @@ -253,9 +269,14 @@ internal static async Task ApplySessionTokenAsync(
(!string.IsNullOrEmpty(requestConsistencyLevel)
&& string.Equals(requestConsistencyLevel, ConsistencyLevel.Session.ToString(), StringComparison.OrdinalIgnoreCase));
ealsur marked this conversation as resolved.
Show resolved Hide resolved

if (!sessionConsistency || (!request.IsReadOnlyRequest && request.OperationType != OperationType.Batch))
bool isMultiMasterEnabledForRequest = globalEndpointManager.CanUseMultipleWriteLocations(request);
kirankumarkolli marked this conversation as resolved.
Show resolved Hide resolved

if (!sessionConsistency
|| (!request.IsReadOnlyRequest
&& request.OperationType != OperationType.Batch
&& !isMultiMasterEnabledForRequest))
{
return; // Only apply the session token in case of session consistency and the request is read only
return; // Only apply the session token in case of session consistency and the request is read only or read/write on multimaster
}

(bool isSuccess, string sessionToken) = await GatewayStoreModel.TryResolveSessionTokenAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(request,
Cosmos.ConsistencyLevel.Session,
sessionContainer,
await this.cosmosClient.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton),
await this.cosmosClient.DocumentClient.GetCollectionCacheAsync(NoOpTrace.Singleton));
await this.cosmosClient.DocumentClient.GetCollectionCacheAsync(NoOpTrace.Singleton),
this.cosmosClient.DocumentClient.GlobalEndpointManager);

string sessionToken = request.Headers[HttpConstants.HttpHeaders.SessionToken];
Assert.IsTrue(!string.IsNullOrEmpty(sessionToken) && sessionToken.Split(',').Length == 1);
Expand Down Expand Up @@ -139,7 +140,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(request,
Cosmos.ConsistencyLevel.Session,
client.DocumentClient.sessionContainer,
await client.DocumentClient.GetPartitionKeyRangeCacheAsync(NoOpTrace.Singleton),
await client.DocumentClient.GetCollectionCacheAsync(NoOpTrace.Singleton));
await client.DocumentClient.GetCollectionCacheAsync(NoOpTrace.Singleton),
client.DocumentClient.GlobalEndpointManager);

string readSessionToken = request.Headers[HttpConstants.HttpHeaders.SessionToken];
Assert.AreEqual(readSessionToken, createSessionToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(
ConsistencyLevel.Session,
new Mock<ISessionContainer>().Object,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object);
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

Assert.IsNull(dsr.Headers[HttpConstants.HttpHeaders.SessionToken]);
});
Expand All @@ -237,14 +238,17 @@ await GatewayStoreModel.ApplySessionTokenAsync(
ConsistencyLevel.Session,
new Mock<ISessionContainer>().Object,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object);
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

Assert.IsNull(dsrQueryPlan.Headers[HttpConstants.HttpHeaders.SessionToken]);
});
}

[TestMethod]
public async Task TestApplySessionForDataOperation()
[DataTestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task TestApplySessionForDataOperation(bool multiMaster)
{
List<ResourceType> resourceTypes = new List<ResourceType>()
{
Expand Down Expand Up @@ -288,7 +292,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(
ConsistencyLevel.Session,
new Mock<ISessionContainer>().Object,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object);
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

Assert.AreEqual(dsrSessionToken, dsr.Headers[HttpConstants.HttpHeaders.SessionToken]);
});
Expand All @@ -306,16 +311,19 @@ await GatewayStoreModel.ApplySessionTokenAsync(
Mock<ISessionContainer> sMock = new Mock<ISessionContainer>();
sMock.Setup(x => x.ResolveGlobalSessionToken(dsrNoSessionToken)).Returns(dsrSessionToken);

Mock<IGlobalEndpointManager> globalEndpointManager = new Mock<IGlobalEndpointManager>();
globalEndpointManager.Setup(gem => gem.CanUseMultipleWriteLocations(It.Is<DocumentServiceRequest>(drs => drs == dsrNoSessionToken))).Returns(multiMaster);
await this.GetGatewayStoreModelForConsistencyTest(async (gatewayStoreModel) =>
{
await GatewayStoreModel.ApplySessionTokenAsync(
dsrNoSessionToken,
ConsistencyLevel.Session,
sMock.Object,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object);
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: globalEndpointManager.Object);

if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch)
if (dsrNoSessionToken.IsReadOnlyRequest || dsrNoSessionToken.OperationType == OperationType.Batch || multiMaster)
{
Assert.AreEqual(dsrSessionToken, dsrNoSessionToken.Headers[HttpConstants.HttpHeaders.SessionToken]);
}
Expand Down Expand Up @@ -363,7 +371,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(
ConsistencyLevel.Session,
sMock.Object,
partitionKeyRangeCache: mockPartitionKeyRangeCache.Object,
clientCollectionCache: mockCollectionCahce.Object);
clientCollectionCache: mockCollectionCahce.Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

if (dsr.IsReadOnlyRequest || dsr.OperationType == OperationType.Batch)
{
Expand Down Expand Up @@ -398,7 +407,8 @@ await GatewayStoreModel.ApplySessionTokenAsync(
ConsistencyLevel.Session,
new Mock<ISessionContainer>().Object,
partitionKeyRangeCache: new Mock<PartitionKeyRangeCache>(null, null, null).Object,
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object);
clientCollectionCache: new Mock<ClientCollectionCache>(new SessionContainer("testhost"), gatewayStoreModel, null, null).Object,
globalEndpointManager: Mock.Of<IGlobalEndpointManager>());

Assert.AreEqual(sessionToken, dsrSprocExecute.Headers[HttpConstants.HttpHeaders.SessionToken]);
});
Expand Down Expand Up @@ -826,6 +836,71 @@ private async Task GatewayStoreModel_Exceptionless_NotUpdateSessionTokenOnKnownR
}
}

/// <summary>
/// When the response contains a PKRangeId header different than the one targeted with the session token, trigger a refresh of the PKRange cache
/// </summary>
[DataTestMethod]
[DataRow("0", "0", false)]
[DataRow("0", "1", true)]
public async Task GatewayStoreModel_OnSplitRefreshesPKRanges(string originalPKRangeId, string splitPKRangeId, bool shouldCallRefresh)
{
string originalSessionToken = originalPKRangeId+ ":1#100#1=20#2=5#3=30";
string updatedSessionToken = splitPKRangeId + ":1#100#1=20#2=5#3=31";

Task<HttpResponseMessage> sendFunc(HttpRequestMessage request)
{
HttpResponseMessage response = new HttpResponseMessage(HttpStatusCode.OK);
response.Headers.Add(HttpConstants.HttpHeaders.SessionToken, updatedSessionToken);
response.Headers.Add(WFConstants.BackendHeaders.PartitionKeyRangeId, splitPKRangeId);
return Task.FromResult(response);
}

Mock<IDocumentClientInternal> mockDocumentClient = new Mock<IDocumentClientInternal>();
mockDocumentClient.Setup(client => client.ServiceEndpoint).Returns(new Uri("https://foo"));

using GlobalEndpointManager endpointManager = new GlobalEndpointManager(mockDocumentClient.Object, new ConnectionPolicy());
SessionContainer sessionContainer = new SessionContainer(string.Empty);
DocumentClientEventSource eventSource = DocumentClientEventSource.Instance;
HttpMessageHandler messageHandler = new MockMessageHandler(sendFunc);
using GatewayStoreModel storeModel = new GatewayStoreModel(
endpointManager,
sessionContainer,
ConsistencyLevel.Session,
eventSource,
null,
MockCosmosUtil.CreateCosmosHttpClient(() => new HttpClient(messageHandler)));

Mock<ClientCollectionCache> clientCollectionCache = new Mock<ClientCollectionCache>(new SessionContainer("testhost"), storeModel, null, null);

Mock<PartitionKeyRangeCache> partitionKeyRangeCache = new Mock<PartitionKeyRangeCache>(null, storeModel, clientCollectionCache.Object);
storeModel.SetCaches(partitionKeyRangeCache.Object, clientCollectionCache.Object);

INameValueCollection headers = new StoreRequestNameValueCollection();
headers.Set(HttpConstants.HttpHeaders.SessionToken, originalSessionToken);

using (new ActivityScope(Guid.NewGuid()))
{
using (DocumentServiceRequest request = DocumentServiceRequest.Create(
OperationType.Read,
ResourceType.Document,
"dbs/OVJwAA==/colls/OVJwAOcMtA0=/docs/OVJwAOcMtA0BAAAAAAAAAA==/",
AuthorizationTokenType.PrimaryMasterKey,
headers))
{
request.RequestContext.ResolvedCollectionRid = "dbs/OVJwAA==/colls/OVJwAOcMtA0=";
request.RequestContext.ResolvedPartitionKeyRange = new PartitionKeyRange() { Id = originalPKRangeId };
await storeModel.ProcessMessageAsync(request);
Assert.AreEqual(updatedSessionToken, sessionContainer.GetSessionToken("dbs/OVJwAA==/colls/OVJwAOcMtA0="));

partitionKeyRangeCache.Verify(pkRangeCache => pkRangeCache.TryGetPartitionKeyRangeByIdAsync(
It.Is<string>(str => str == "dbs/OVJwAA==/colls/OVJwAOcMtA0="),
It.Is<string>(str => str == splitPKRangeId),
It.IsAny<ITrace>(),
It.Is<bool>(b => b == true)), shouldCallRefresh ? Times.Once : Times.Never);
}
}
}

private class MockMessageHandler : HttpMessageHandler
{
private readonly Func<HttpRequestMessage, Task<HttpResponseMessage>> sendFunc;
Expand Down