Skip to content

Commit

Permalink
Gateway: Adds optimization to reduce gateway header size by scoping s…
Browse files Browse the repository at this point in the history
…ession token to specific partitions. (#2165)

Currently in Gateway Mode, the entire session token from all Partitions is sent to the backend. With this change we try to resolve the partition of the request before sending to gateway and send only that session token. This will solve the issue of large headers leading to 400 errors.
We first try to resolve the partition from PartitionKeyRangeCache. If we can't, we refresh the cache. If we still can't we send the GlobalSessionToken.
  • Loading branch information
asketagarwal committed Mar 29, 2021
1 parent 94914c4 commit 42e597c
Show file tree
Hide file tree
Showing 6 changed files with 365 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ private async Task GetInitializationTaskAsync(IStoreClientFactory storeClientFac
this.partitionKeyRangeCache = new PartitionKeyRangeCache(this, this.GatewayStoreModel, this.collectionCache);
this.ResetSessionTokenRetryPolicy = new ResetSessionTokenRetryPolicyFactory(this.sessionContainer, this.collectionCache, this.retryPolicy);

gatewayStoreModel.SetCaches(this.partitionKeyRangeCache, this.collectionCache);

if (this.ConnectionPolicy.ConnectionMode == ConnectionMode.Gateway)
{
this.StoreModel = this.GatewayStoreModel;
Expand Down
154 changes: 146 additions & 8 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ namespace Microsoft.Azure.Cosmos
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Collections;
using Newtonsoft.Json;
Expand All @@ -28,6 +31,10 @@ internal class GatewayStoreModel : IStoreModel, IDisposable

private GatewayStoreClient gatewayStoreClient;

// Caches to resolve the PartitionKeyRange from request. For Session Token Optimization.
private ClientCollectionCache clientCollectionCache;
private PartitionKeyRangeCache partitionKeyRangeCache;

public GatewayStoreModel(
GlobalEndpointManager endpointManager,
ISessionContainer sessionContainer,
Expand All @@ -49,10 +56,12 @@ public GatewayStoreModel(

public virtual async Task<DocumentServiceResponse> ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken = default)
{
GatewayStoreModel.ApplySessionToken(
await GatewayStoreModel.ApplySessionTokenAsync(
request,
this.defaultConsistencyLevel,
this.sessionContainer);
this.sessionContainer,
this.partitionKeyRangeCache,
this.clientCollectionCache);

DocumentServiceResponse response;
try
Expand Down Expand Up @@ -143,6 +152,13 @@ public virtual async Task<AccountProperties> GetDatabaseAccountAsync(Func<ValueT
return databaseAccount;
}

public void SetCaches(PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
{
this.clientCollectionCache = clientCollectionCache;
this.partitionKeyRangeCache = partitionKeyRangeCache;
}

public void Dispose()
{
this.Dispose(true);
Expand Down Expand Up @@ -193,10 +209,12 @@ private void CaptureSessionToken(
}
}

internal static void ApplySessionToken(
internal static async Task ApplySessionTokenAsync(
DocumentServiceRequest request,
ConsistencyLevel defaultConsistencyLevel,
ISessionContainer sessionContainer)
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
{
if (request.Headers == null)
{
Expand Down Expand Up @@ -227,20 +245,140 @@ internal static void ApplySessionToken(
(!string.IsNullOrEmpty(requestConsistencyLevel)
&& string.Equals(requestConsistencyLevel, ConsistencyLevel.Session.ToString(), StringComparison.OrdinalIgnoreCase));

if (!sessionConsistency)
if (!sessionConsistency || (!request.IsReadOnlyRequest && request.OperationType != OperationType.Batch))
{
return; // Only apply the session token in case of session consistency
return; // Only apply the session token in case of session consistency and the request is read only
}

//Apply the ambient session.
string sessionToken = sessionContainer.ResolveGlobalSessionToken(request);
(bool isSuccess, string sessionToken) = await GatewayStoreModel.TryResolveSessionTokenAsync(request,
sessionContainer,
partitionKeyRangeCache,
clientCollectionCache);

if (!isSuccess)
{
sessionToken = sessionContainer.ResolveGlobalSessionToken(request);
}

if (!string.IsNullOrEmpty(sessionToken))
{
request.Headers[HttpConstants.HttpHeaders.SessionToken] = sessionToken;
}
}

internal static async Task<Tuple<bool, string>> TryResolveSessionTokenAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache)
{
if (request == null)
{
throw new ArgumentNullException(nameof(request));
}

if (sessionContainer == null)
{
throw new ArgumentNullException(nameof(sessionContainer));
}

if (partitionKeyRangeCache == null)
{
throw new ArgumentNullException(nameof(partitionKeyRangeCache));
}

if (clientCollectionCache == null)
{
throw new ArgumentNullException(nameof(clientCollectionCache));
}

if (request.ResourceType.IsPartitioned())
{
(bool isSuccess, PartitionKeyRange partitionKeyRange) = await TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: false);

if (isSuccess && sessionContainer is SessionContainer gatewaySessionContainer)
{
request.RequestContext.ResolvedPartitionKeyRange = partitionKeyRange;
string localSessionToken = gatewaySessionContainer.ResolvePartitionLocalSessionTokenForGateway(request, partitionKeyRange.Id);
if (!string.IsNullOrEmpty(localSessionToken))
{
return new Tuple<bool, string>(true, localSessionToken);
}
}
}

return new Tuple<bool, string>(false, null);
}

private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKeyRangeAsync(DocumentServiceRequest request,
ISessionContainer sessionContainer,
PartitionKeyRangeCache partitionKeyRangeCache,
ClientCollectionCache clientCollectionCache,
bool refreshCache)
{
if (refreshCache)
{
request.ForceMasterRefresh = true;
request.ForceNameCacheRefresh = true;
}

PartitionKeyRange partitonKeyRange = null;
ContainerProperties collection = await clientCollectionCache.ResolveCollectionAsync(request, CancellationToken.None, NoOpTrace.Singleton);

string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey];
if (partitionKeyString != null)
{
CollectionRoutingMap collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);

if (refreshCache && collectionRoutingMap != null)
{
collectionRoutingMap = await partitionKeyRangeCache.TryLookupAsync(collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
}

partitonKeyRange = AddressResolver.TryResolveServerPartitionByPartitionKey(request: request,
partitionKeyString: partitionKeyString,
collectionCacheUptoDate: false,
collection: collection,
routingMap: collectionRoutingMap);
}
else if (request.PartitionKeyRangeIdentity != null)
{
PartitionKeyRangeIdentity partitionKeyRangeId = request.PartitionKeyRangeIdentity;
partitonKeyRange = await partitionKeyRangeCache.TryGetPartitionKeyRangeByIdAsync(collection.ResourceId,
partitionKeyRangeId.ToString(),
NoOpTrace.Singleton,
refreshCache);
}

if (partitonKeyRange == null)
{
if (refreshCache)
{
return new Tuple<bool, PartitionKeyRange>(false, null);
}

// need to refresh cache. Maybe split happened.
return await GatewayStoreModel.TryResolvePartitionKeyRangeAsync(request: request,
sessionContainer: sessionContainer,
partitionKeyRangeCache: partitionKeyRangeCache,
clientCollectionCache: clientCollectionCache,
refreshCache: true);
}

return new Tuple<bool, PartitionKeyRange>(true, partitonKeyRange);
}

// DEVNOTE: This can be replace with ReplicatedResourceClient.IsMasterOperation on next Direct sync
internal static bool IsMasterOperation(
ResourceType resourceType,
Expand Down
6 changes: 3 additions & 3 deletions Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ private async Task<ResolutionResult> TryResolveServerPartitionAsync(
object effectivePartitionKeyStringObject = null;
if (partitionKeyString != null)
{
range = this.TryResolveServerPartitionByPartitionKey(
range = AddressResolver.TryResolveServerPartitionByPartitionKey(
request,
partitionKeyString,
collectionCacheIsUptodate,
Expand Down Expand Up @@ -548,7 +548,7 @@ private PartitionKeyRange TryResolveSinglePartitionCollection(
// due to parallel usage of V3 SDK and a possible storage or throughput split
// The current client might be legacy and not aware of this.
// In such case route the request to the first partition
return this.TryResolveServerPartitionByPartitionKey(
return AddressResolver.TryResolveServerPartitionByPartitionKey(
request,
"[]", // This corresponds to first partition
collectionCacheIsUptoDate,
Expand Down Expand Up @@ -629,7 +629,7 @@ private async Task<ResolutionResult> TryResolveServerPartitionByPartitionKeyRang
return new ResolutionResult(partitionKeyRange, addresses, identity);
}

private PartitionKeyRange TryResolveServerPartitionByPartitionKey(
internal static PartitionKeyRange TryResolveServerPartitionByPartitionKey(
DocumentServiceRequest request,
string partitionKeyString,
bool collectionCacheUptoDate,
Expand Down
32 changes: 32 additions & 0 deletions Microsoft.Azure.Cosmos/src/SessionContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public ISessionToken ResolvePartitionLocalSessionToken(DocumentServiceRequest re
return SessionContainer.ResolvePartitionLocalSessionToken(this.state, request, partitionKeyRangeId);
}

public string ResolvePartitionLocalSessionTokenForGateway(DocumentServiceRequest request, string partitionKeyRangeId)
{
return SessionContainer.ResolvePartitionLocalSessionTokenForGateway(this.state, request, partitionKeyRangeId);
}

public void ClearTokenByCollectionFullname(string collectionFullname)
{
SessionContainer.ClearTokenByCollectionFullname(this.state, collectionFullname);
Expand Down Expand Up @@ -139,6 +144,33 @@ private static ISessionToken ResolvePartitionLocalSessionToken(SessionContainerS
return SessionTokenHelper.ResolvePartitionLocalSessionToken(request, partitionKeyRangeId, SessionContainer.GetPartitionKeyRangeIdToTokenMap(self, request));
}

private static string ResolvePartitionLocalSessionTokenForGateway(SessionContainerState self,
DocumentServiceRequest request,
string partitionKeyRangeId)
{
ConcurrentDictionary<string, ISessionToken> partitionKeyRangeIdToTokenMap = SessionContainer.GetPartitionKeyRangeIdToTokenMap(self, request);
if (partitionKeyRangeIdToTokenMap != null)
{
if (partitionKeyRangeIdToTokenMap.TryGetValue(partitionKeyRangeId, out ISessionToken sessionToken))
{
return partitionKeyRangeId + ":" + sessionToken.ConvertToString();
}
else if (request.RequestContext.ResolvedPartitionKeyRange.Parents != null)
{
for (int parentIndex = request.RequestContext.ResolvedPartitionKeyRange.Parents.Count - 1; parentIndex >= 0; parentIndex--)
{
if (partitionKeyRangeIdToTokenMap.TryGetValue(request.RequestContext.ResolvedPartitionKeyRange.Parents[parentIndex],
out sessionToken))
{
return partitionKeyRangeId + ":" + sessionToken.ConvertToString();
}
}
}
}

return null;
}

private static void ClearTokenByCollectionFullname(SessionContainerState self, string collectionFullname)
{
if (!string.IsNullOrEmpty(collectionFullname))
Expand Down
Loading

0 comments on commit 42e597c

Please sign in to comment.