Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gateway: Adds optimization to reduce gateway header size by scoping session token to specific partitions. #2165

Merged
merged 21 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
43f2366
Session Token Optimization for Gateway Mode.
asketagarwal Jan 29, 2021
d436acf
Adding Test for Reading session token after write.
asketagarwal Feb 1, 2021
e3c916b
Changing the Properties to private
asketagarwal Feb 2, 2021
08daac3
Avoiding null returning methods.
asketagarwal Feb 2, 2021
1f82dbc
Improving Readability
asketagarwal Feb 3, 2021
5951f33
Having an explicit resolve session token method for gateway
asketagarwal Feb 8, 2021
dd4b7e9
Improving styling and readability
asketagarwal Feb 16, 2021
cbc0bab
Merge remote-tracking branch 'upstream/master' into users/askagarw/Se…
asketagarwal Feb 16, 2021
e59f07e
MErging latest changes
asketagarwal Feb 16, 2021
59bec16
Merge branch 'master' into users/askagarw/SessionTokenOptimisation
j82w Feb 19, 2021
046ab92
Merge remote-tracking branch 'upstream/master' into users/askagarw/Se…
asketagarwal Mar 16, 2021
f1d56e5
nitpicks
asketagarwal Mar 17, 2021
dae7387
Merge branch 'master' into users/askagarw/SessionTokenOptimisation
j82w Mar 22, 2021
8c1e990
Merge remote-tracking branch 'upstream/master' into users/askagarw/Se…
asketagarwal Mar 23, 2021
e10e1e6
Merge branch 'users/askagarw/SessionTokenOptimisation' of https://git…
asketagarwal Mar 23, 2021
13474de
Fixing build
asketagarwal Mar 23, 2021
aa7f1b9
Adding Resolved PArtition to request
asketagarwal Mar 29, 2021
5a47553
adding class name to static method
asketagarwal Mar 29, 2021
2ca9628
Merge branch 'master' into users/askagarw/SessionTokenOptimisation
j82w Mar 29, 2021
0a4bba5
Merge branch 'master' into users/askagarw/SessionTokenOptimisation
j82w Mar 29, 2021
f8c3b33
Merge branch 'master' into users/askagarw/SessionTokenOptimisation
j82w Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,8 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFac
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

if (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway)
{
this.StoreModel = this.GatewayStoreModel;
Expand Down
145 changes: 137 additions & 8 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
Expand All @@ -28,6 +30,10 @@ internal class GatewayStoreModel : IStoreModel, IDisposable

private GatewayStoreClient gatewayStoreClient;

// Caches to resolve the PartitionKeyRange from request. For Session Token Optimization.
private ClientCollectionCache clientCollectionCache;
private PartitionKeyRangeCache partitionKeyRangeCache;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
Expand All @@ -49,10 +55,12 @@ public GatewayStoreModel(

public virtual async Task<DocumentServiceResponse> ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default)
{
GatewayStoreModel.ApplySessionToken(
await GatewayStoreModel.ApplySessionTokenAsync(
request,
this.defaultConsistencyLevel,
this.sessionContainer);
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache);

DocumentServiceResponse response;
try
Expand Down Expand Up @@ -143,6 +151,13 @@ public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueT
return databaseAccount;
}

public void SetCaches(PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
{
this.clientCollectionCache = clientCollectionCache;
this.partitionKeyRangeCache = partitionKeyRangeCache;
}

public void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -193,10 +208,12 @@ private void CaptureSessionToken(
}
}

internal static void ApplySessionToken(
internal static async Task ApplySessionTokenAsync(
DocumentServiceRequest request,
ConsistencyLevel defaultConsistencyLevel,
ISessionContainer sessionContainer)
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
{
if (request.Headers == null)
{
Expand Down Expand Up @@ -227,20 +244,132 @@ internal static void ApplySessionToken(
(!string.IsNullOrEmpty(requestConsistencyLevel)
&& string.Equals(requestConsistencyLevel, ConsistencyLevel.Session.ToString(), StringComparison.OrdinalIgnoreCase));

if (!sessionConsistency)
if (!sessionConsistency || (!request.IsReadOnlyRequest && request.OperationType != OperationType.Batch))
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
return; // Only apply the session token in case of session consistency
return; // Only apply the session token in case of session consistency and the request is read only
}

//Apply the ambient session.
string sessionToken = sessionContainer.ResolveGlobalSessionToken(request);
(bool isSuccess, string sessionToken) = await TryResolveSessionTokenAsync(request,
sessionContainer,
partitionKeyRangeCache,
clientCollectionCache);

if (!isSuccess)
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
sessionToken = sessionContainer.ResolveGlobalSessionToken(request);
}

if (!string.IsNullOrEmpty(sessionToken))
{
request.Headers[HttpConstants.HttpHeaders.SessionToken] = sessionToken;
}
}

internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
{
if (request == null)
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ArgumentNullException(nameof(request));
}

if (sessionContainer == null)
{
throw new ArgumentNullException(nameof(sessionContainer));
}

if (partitionKeyRangeCache == null)
{
throw new ArgumentNullException(nameof(partitionKeyRangeCache));
}

if (clientCollectionCache == null)
{
throw new ArgumentNullException(nameof(clientCollectionCache));
}

if (request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: false);

if (isSuccess && sessionContainer is SessionContainer gatewaySessionContainer)
{
string localSessionToken = gatewaySessionContainer.ResolvePartitionLocalSessionTokenForGateway(request, partitionKeyRange.Id);
if (!string.IsNullOrEmpty(localSessionToken))
{
return new Tuple<bool, string>(true, localSessionToken);
}
}
}

return new Tuple<bool, string>(false, null);
}

private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache,
bool refreshCache)
{
if (refreshCache)
{
request.ForceMasterRefresh = true;
request.ForceNameCacheRefresh = true;
}

PartitionKeyRange partitonKeyRange = null;
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(request, CancellationToken.None);

string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey];
if (partitionKeyString != null)
{
CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None);

if (refreshCache && collectionRoutingMap != null)
{
collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None);
}

partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(request: request,
partitionKeyString: partitionKeyString,
collectionCacheUptoDate: false,
collection: collection,
routingMap: collectionRoutingMap);
}
else if (request.PartitionKeyRangeIdentity != null)
{
PartitionKeyRangeIdentity partitionKeyRangeId = request.PartitionKeyRangeIdentity;
partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(collection.ResourceId,
partitionKeyRangeId.ToString(),
refreshCache);
}

if (partitonKeyRange == null)
{
if (refreshCache)
{
return new Tuple<bool, PartitionKeyRange>(false, null);
}

// need to refresh cache. Maybe split happened.
return await TryResolvePartitionKeyRangeAsync(request, sessionContainer, partitionKeyRangeCache, clientCollectionCache, true);
j82w marked this conversation as resolved.
Show resolved Hide resolved
j82w marked this conversation as resolved.
Show resolved Hide resolved
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}

return new Tuple<bool, PartitionKeyRange>(true, partitonKeyRange);
}

// DEVNOTE: This can be replace with ReplicatedResourceClient.IsMasterOperation on next Direct sync
internal static bool IsMasterOperation(
ResourceType resourceType,
Expand Down
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ private async Task<ResolutionResult> TryResolveServerPartitionAsync(
object effectivePartitionKeyStringObject = null;
if (partitionKeyString != null)
{
range = this.TryResolveServerPartitionByPartitionKey(
asketagarwal marked this conversation as resolved.
Show resolved Hide resolved
range = AddressResolver.TryResolveServerPartitionByPartitionKey(
request,
partitionKeyString,
collectionCacheIsUptodate,
Expand Down Expand Up @@ -543,7 +543,7 @@ private PartitionKeyRange TryResolveSinglePartitionCollection(
// due to parallel usage of V3 SDK and a possible storage or throughput split
// The current client might be legacy and not aware of this.
// In such case route the request to the first partition
return this.TryResolveServerPartitionByPartitionKey(
return AddressResolver.TryResolveServerPartitionByPartitionKey(
request,
"[]", // This corresponds to first partition
collectionCacheIsUptoDate,
Expand Down Expand Up @@ -624,7 +624,7 @@ private async Task<ResolutionResult> TryResolveServerPartitionByPartitionKeyRang
return new ResolutionResult(partitionKeyRange, addresses, identity);
}

private PartitionKeyRange TryResolveServerPartitionByPartitionKey(
internal static PartitionKeyRange TryResolveServerPartitionByPartitionKey(
DocumentServiceRequest request,
string partitionKeyString,
bool collectionCacheUptoDate,
Expand Down
32 changes: 32 additions & 0 deletions Microsoft.Azure.Cosmos/src/SessionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public ISessionToken ResolvePartitionLocalSessionToken(DocumentServiceRequest re
return SessionContainer.ResolvePartitionLocalSessionToken(this.state, request, partitionKeyRangeId);
}

public string ResolvePartitionLocalSessionTokenForGateway(DocumentServiceRequest request, string partitionKeyRangeId)
{
return SessionContainer.ResolvePartitionLocalSessionTokenForGateway(this.state, request, partitionKeyRangeId);
}

public void ClearTokenByCollectionFullname(string collectionFullname)
{
SessionContainer.ClearTokenByCollectionFullname(this.state, collectionFullname);
Expand Down Expand Up @@ -139,6 +144,33 @@ private static ISessionToken ResolvePartitionLocalSessionToken(SessionContainerS
return SessionTokenHelper.ResolvePartitionLocalSessionToken(request, partitionKeyRangeId, SessionContainer.GetPartitionKeyRangeIdToTokenMap(self, request));
}

private static string ResolvePartitionLocalSessionTokenForGateway(SessionContainerState self,
DocumentServiceRequest request,
string partitionKeyRangeId)
{
ConcurrentDictionary<string, ISessionToken> partitionKeyRangeIdToTokenMap = SessionContainer.GetPartitionKeyRangeIdToTokenMap(self, request);
if (partitionKeyRangeIdToTokenMap != null)
{
if (partitionKeyRangeIdToTokenMap.TryGetValue(partitionKeyRangeId, out ISessionToken sessionToken))
{
return partitionKeyRangeId + ":" + sessionToken.ConvertToString();
}
else if (request.RequestContext.ResolvedPartitionKeyRange.Parents != null)
{
for (int parentIndex = request.RequestContext.ResolvedPartitionKeyRange.Parents.Count - 1; parentIndex >= 0; parentIndex--)
{
if (partitionKeyRangeIdToTokenMap.TryGetValue(request.RequestContext.ResolvedPartitionKeyRange.Parents[parentIndex],
out sessionToken))
{
return partitionKeyRangeId + ":" + sessionToken.ConvertToString();
ealsur marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

return null;
}

private static void ClearTokenByCollectionFullname(SessionContainerState self, string collectionFullname)
{
if (!string.IsNullOrEmpty(collectionFullname))
Expand Down
Loading