diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/CompositeContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/CompositeContinuationToken.cs index 78d0290bb9..c8b9474562 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/CompositeContinuationToken.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/CompositeContinuationToken.cs @@ -10,12 +10,13 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens using Microsoft.Azure.Cosmos.Query.Core.Exceptions; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Routing; + using Microsoft.Azure.Documents.Routing; using Newtonsoft.Json; /// /// A composite continuation token that has both backend continuation token and partition range information. /// - internal sealed class CompositeContinuationToken + internal sealed class CompositeContinuationToken : IPartitionedToken { private static class PropertyNames { @@ -41,6 +42,9 @@ public Documents.Routing.Range Range set; } + [JsonIgnore] + public Range PartitionRange => this.Range; + public object ShallowCopy() { return this.MemberwiseClone(); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/IPartitionedToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/IPartitionedToken.cs new file mode 100644 index 0000000000..2e81a39fa1 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/IPartitionedToken.cs @@ -0,0 +1,11 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens +{ + internal interface IPartitionedToken + { + Documents.Routing.Range PartitionRange { get; } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/OrderByContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/OrderByContinuationToken.cs index 6fd5929708..74cb8fc3f6 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/OrderByContinuationToken.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/OrderByContinuationToken.cs @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens using Microsoft.Azure.Cosmos.Query.Core.Exceptions; using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy; using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Documents.Routing; using Newtonsoft.Json; /// @@ -53,7 +54,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens /// {"compositeToken":{"token":"+RID:OpY0AN-mFAACAAAAAAAABA==#RT:1#TRC:1#RTD:qdTAEA==","range":{"min":"05C1D9CD673398","max":"05C1E399CD6732"}},"orderByItems"[{"item":2}],"rid":"OpY0AN-mFAACAAAAAAAABA==","skipCount":0,"filter":"r.key > 1"} /// ]]> /// - internal sealed class OrderByContinuationToken + internal sealed class OrderByContinuationToken : IPartitionedToken { private static class PropertyNames { @@ -206,6 +207,9 @@ public string Filter get; } + [JsonIgnore] + public Range PartitionRange => this.CompositeContinuationToken.Range; + public static CosmosElement ToCosmosElement(OrderByContinuationToken orderByContinuationToken) { CosmosElement compositeContinuationToken = CompositeContinuationToken.ToCosmosElement(orderByContinuationToken.CompositeContinuationToken); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs index ea2964f834..f9ae8f77bd 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs @@ -12,10 +12,10 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext using Core.ExecutionComponent; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Diagnostics; - using Microsoft.Azure.Cosmos.Json; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Query.Core.Collections; using Microsoft.Azure.Cosmos.Query.Core.ComparableTask; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.Azure.Cosmos.Query.Core.Exceptions; using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.Parallel; @@ -322,48 +322,43 @@ public override void Stop() this.comparableTaskScheduler.Stop(); } - /// - /// Initializes cross partition query execution context by initializing the necessary document producers. - /// - /// The collection to drain from. - /// The partitions to target. - /// The page size to start the document producers off with. - /// The query specification for the rewritten query. - /// Map from partition to it's corresponding continuation token. - /// Whether or not we should defer the fetch of the first page from each partition. - /// The filter to inject in the predicate. - /// The callback used to filter each partition. - /// The cancellation token. - /// A task to await on. protected async Task TryInitializeAsync( string collectionRid, - IReadOnlyList partitionKeyRanges, int initialPageSize, SqlQuerySpec querySpecForInit, - IReadOnlyDictionary targetRangeToContinuationMap, + IReadOnlyDictionary targetRangeToContinuationMap, bool deferFirstPage, string filter, Func> tryFilterAsync, CancellationToken cancellationToken) { + if (collectionRid == null) + { + throw new ArgumentNullException(nameof(collectionRid)); + } + + if (initialPageSize < 0) + { + throw new ArgumentOutOfRangeException(nameof(initialPageSize)); + } + + if (querySpecForInit == null) + { + throw new ArgumentNullException(nameof(querySpecForInit)); + } + + if (targetRangeToContinuationMap == null) + { + throw new ArgumentNullException(nameof(targetRangeToContinuationMap)); + } + cancellationToken.ThrowIfCancellationRequested(); List itemProducerTrees = new List(); - foreach (PartitionKeyRange partitionKeyRange in partitionKeyRanges) + foreach (KeyValuePair rangeAndContinuationToken in targetRangeToContinuationMap) { - string initialContinuationToken; - if (targetRangeToContinuationMap != null) - { - if (!targetRangeToContinuationMap.TryGetValue(partitionKeyRange.Id, out initialContinuationToken)) - { - initialContinuationToken = null; - } - } - else - { - initialContinuationToken = null; - } - + PartitionKeyRange partitionKeyRange = rangeAndContinuationToken.Key; + string continuationToken = rangeAndContinuationToken.Value; ItemProducerTree itemProducerTree = new ItemProducerTree( this.queryContext, querySpecForInit, @@ -375,7 +370,7 @@ protected async Task TryInitializeAsync( deferFirstPage, collectionRid, initialPageSize, - initialContinuationToken) + continuationToken) { Filter = filter }; @@ -436,136 +431,127 @@ protected async Task TryInitializeAsync( return TryCatch.FromResult(); } - /// - /// - /// If a query encounters split up resuming using continuation, we need to regenerate the continuation tokens. - /// Specifically, since after split we will have new set of ranges, we need to remove continuation token for the - /// parent partition and introduce continuation token for the child partitions. - /// - /// - /// This function does that. Also in that process, we also check validity of the input continuation tokens. For example, - /// even after split the boundary ranges of the child partitions should match with the parent partitions. If the Min and Max - /// range of a target partition in the continuation token was Min1 and Max1. Then the Min and Max range info for the two - /// corresponding child partitions C1Min, C1Max, C2Min, and C2Max should follow the constrain below: - /// PMax = C2Max > C2Min > C1Max > C1Min = PMin. - /// - /// - /// The partition key ranges to extract continuation tokens for. - /// The continuation token that the user supplied. - /// The type of continuation token to generate. - /// - /// The code assumes that merge doesn't happen and - /// - /// The index of the partition whose MinInclusive is equal to the suppliedContinuationTokens along with the continuation tokens. - protected static TryCatch> TryFindTargetRangeAndExtractContinuationTokens( - List partitionKeyRanges, - IEnumerable>> suppliedContinuationTokens) + public static TryCatch> TryGetInitializationInfo( + IReadOnlyList partitionKeyRanges, + IReadOnlyList partitionedContinuationTokens) + where PartitionedToken : IPartitionedToken { if (partitionKeyRanges == null) { throw new ArgumentNullException(nameof(partitionKeyRanges)); } - if (partitionKeyRanges.Count < 1) + if (partitionedContinuationTokens == null) { - throw new ArgumentException(nameof(partitionKeyRanges)); + throw new ArgumentNullException(nameof(partitionedContinuationTokens)); } - foreach (PartitionKeyRange partitionKeyRange in partitionKeyRanges) + if (partitionKeyRanges.Count < 1) { - if (partitionKeyRange == null) - { - throw new ArgumentException(nameof(partitionKeyRanges)); - } + throw new ArgumentException(nameof(partitionKeyRanges)); } - if (suppliedContinuationTokens == null) + if (partitionedContinuationTokens.Count < 1) { - throw new ArgumentNullException(nameof(suppliedContinuationTokens)); + throw new ArgumentException(nameof(partitionKeyRanges)); } - if (suppliedContinuationTokens.Count() < 1) + if (partitionedContinuationTokens.Count > partitionKeyRanges.Count) { - throw new ArgumentException(nameof(suppliedContinuationTokens)); + throw new ArgumentException($"{nameof(partitionedContinuationTokens)} can not have more elements than {nameof(partitionKeyRanges)}."); } - if (suppliedContinuationTokens.Count() > partitionKeyRanges.Count) - { - throw new ArgumentException($"{nameof(suppliedContinuationTokens)} can not have more elements than {nameof(partitionKeyRanges)}."); - } + // Find the continuation token for the partition we left off on: + PartitionedToken firstContinuationToken = partitionedContinuationTokens + .OrderBy((partitionedToken) => partitionedToken.PartitionRange.Min) + .First(); - Dictionary targetRangeToContinuationTokenMap = new Dictionary(); + // Segment the ranges based off that: + ReadOnlyMemory sortedRanges = partitionKeyRanges + .OrderBy((partitionKeyRange) => partitionKeyRange.MinInclusive) + .ToArray(); - // Find the minimum index. - Tuple> firstContinuationTokenAndRange = suppliedContinuationTokens - .OrderBy((tuple) => tuple.Item2.Min) - .First(); - TContinuationToken firstContinuationToken = firstContinuationTokenAndRange.Item1; PartitionKeyRange firstContinuationRange = new PartitionKeyRange { - MinInclusive = firstContinuationTokenAndRange.Item2.Min, - MaxExclusive = firstContinuationTokenAndRange.Item2.Max + MinInclusive = firstContinuationToken.PartitionRange.Min, + MaxExclusive = firstContinuationToken.PartitionRange.Max }; - int minIndex = partitionKeyRanges.BinarySearch( + int matchedIndex = sortedRanges.Span.BinarySearch( firstContinuationRange, Comparer.Create((range1, range2) => string.CompareOrdinal(range1.MinInclusive, range2.MinInclusive))); - if (minIndex < 0) + if (matchedIndex < 0) { - return TryCatch>.FromException( + return TryCatch>.FromException( new MalformedContinuationTokenException( $"{RMResources.InvalidContinuationToken} - Could not find continuation token: {firstContinuationToken}")); } - foreach (Tuple> suppledContinuationToken in suppliedContinuationTokens) - { - // find what ranges make up the supplied continuation token - TContinuationToken continuationToken = suppledContinuationToken.Item1; - Documents.Routing.Range range = suppledContinuationToken.Item2; - - IEnumerable replacementRanges = partitionKeyRanges - .Where((partitionKeyRange) => - string.CompareOrdinal(range.Min, partitionKeyRange.MinInclusive) <= 0 && - string.CompareOrdinal(range.Max, partitionKeyRange.MaxExclusive) >= 0) - .OrderBy((partitionKeyRange) => partitionKeyRange.MinInclusive); + ReadOnlyMemory partitionsLeftOfTarget = matchedIndex == 0 ? ReadOnlyMemory.Empty : sortedRanges.Slice(start: 0, length: matchedIndex); + ReadOnlyMemory targetPartition = sortedRanges.Slice(start: matchedIndex, length: 1); + ReadOnlyMemory partitionsRightOfTarget = matchedIndex == sortedRanges.Length - 1 ? ReadOnlyMemory.Empty : sortedRanges.Slice(start: matchedIndex + 1); + + // Create the continuation token mapping for each region. + IReadOnlyDictionary mappingForPartitionsLeftOfTarget = MatchRangesToContinuationTokens( + partitionsLeftOfTarget, + partitionedContinuationTokens); + IReadOnlyDictionary mappingForTargetPartition = MatchRangesToContinuationTokens( + targetPartition, + partitionedContinuationTokens); + IReadOnlyDictionary mappingForPartitionsRightOfTarget = MatchRangesToContinuationTokens( + partitionsRightOfTarget, + partitionedContinuationTokens); + + return TryCatch>.FromResult( + new PartitionMapping( + partitionsLeftOfTarget: mappingForPartitionsLeftOfTarget, + targetPartition: mappingForTargetPartition, + partitionsRightOfTarget: mappingForPartitionsRightOfTarget)); + } - // Could not find the child ranges - if (replacementRanges.Count() == 0) - { - return TryCatch>.FromException( - new MalformedContinuationTokenException( - $"{RMResources.InvalidContinuationToken} - Could not find continuation token: {continuationToken}")); - } + /// + /// Matches ranges to their corresponding continuation token. + /// Note that most ranges don't have a corresponding continuation token, so their value will be set to null. + /// Also note that in the event of a split two or more ranges will match to the same continuation token. + /// + /// The type of token we are matching with. + /// The partition key ranges to match. + /// The continuation tokens to match with. + /// A dictionary of ranges matched with their continuation tokens. + public static IReadOnlyDictionary MatchRangesToContinuationTokens( + ReadOnlyMemory partitionKeyRanges, + IReadOnlyList partitionedContinuationTokens) + where PartitionedToken : IPartitionedToken + { + if (partitionedContinuationTokens == null) + { + throw new ArgumentNullException(nameof(partitionedContinuationTokens)); + } - // PMax = C2Max > C2Min > C1Max > C1Min = PMin. - string parentMax = range.Max; - string child2Max = replacementRanges.Last().MaxExclusive; - string child2Min = replacementRanges.Last().MinInclusive; - string child1Max = replacementRanges.First().MaxExclusive; - string child1Min = replacementRanges.First().MinInclusive; - string parentMin = range.Min; - - if (!(parentMax == child2Max && - string.CompareOrdinal(child2Max, child2Min) >= 0 && - (replacementRanges.Count() == 1 ? true : string.CompareOrdinal(child2Min, child1Max) >= 0) && - string.CompareOrdinal(child1Max, child1Min) >= 0 && - child1Min == parentMin)) + Dictionary partitionKeyRangeToToken = new Dictionary(); + ReadOnlySpan partitionKeyRangeSpan = partitionKeyRanges.Span; + for (int i = 0; i < partitionKeyRangeSpan.Length; i++) + { + PartitionKeyRange partitionKeyRange = partitionKeyRangeSpan[i]; + foreach (PartitionedToken partitionedToken in partitionedContinuationTokens) { - return TryCatch>.FromException( - new MalformedContinuationTokenException( - $"{RMResources.InvalidContinuationToken} - PMax = C2Max > C2Min > C1Max > C1Min = PMin: {continuationToken}")); + // See if continuation token includes the range + if ((partitionKeyRange.MinInclusive.CompareTo(partitionedToken.PartitionRange.Min) >= 0) + && (partitionKeyRange.MaxExclusive.CompareTo(partitionedToken.PartitionRange.Max) <= 0)) + { + partitionKeyRangeToToken[partitionKeyRange] = partitionedToken; + break; + } } - foreach (PartitionKeyRange partitionKeyRange in replacementRanges) + if (!partitionKeyRangeToToken.ContainsKey(partitionKeyRange)) { - targetRangeToContinuationTokenMap.Add(partitionKeyRange.Id, continuationToken); + // Could not find a matching token so just set it to null + partitionKeyRangeToToken[partitionKeyRange] = default; } } - return TryCatch>.FromResult( - new InitInfo( - minIndex, - targetRangeToContinuationTokenMap)); + return partitionKeyRangeToToken; } protected virtual long GetAndResetResponseLengthBytes() @@ -672,12 +658,29 @@ public InitInfo(int targetIndex, IReadOnlyDictionary public IReadOnlyDictionary ContinuationTokens { get; } } + public readonly struct PartitionMapping + { + public PartitionMapping( + IReadOnlyDictionary partitionsLeftOfTarget, + IReadOnlyDictionary targetPartition, + IReadOnlyDictionary partitionsRightOfTarget) + { + this.PartitionsLeftOfTarget = partitionsLeftOfTarget ?? throw new ArgumentNullException(nameof(partitionsLeftOfTarget)); + this.TargetPartition = targetPartition ?? throw new ArgumentNullException(nameof(targetPartition)); + this.PartitionsRightOfTarget = partitionsRightOfTarget ?? throw new ArgumentNullException(nameof(partitionsRightOfTarget)); + } + + public IReadOnlyDictionary PartitionsLeftOfTarget { get; } + public IReadOnlyDictionary TargetPartition { get; } + public IReadOnlyDictionary PartitionsRightOfTarget { get; } + } + /// /// All CrossPartitionQueries need this information on top of the parameter for DocumentQueryExecutionContextBase. /// I moved it out into it's own type, so that we don't have to keep passing around all the individual parameters in the factory pattern. /// This also allows us to check the arguments once instead of in each of the constructors. /// - public struct CrossPartitionInitParams + public readonly struct CrossPartitionInitParams { /// /// Initializes a new instance of the InitParams struct. diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.ContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.ContinuationToken.cs new file mode 100644 index 0000000000..c0df108ecd --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.ContinuationToken.cs @@ -0,0 +1,149 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; + + internal sealed partial class CosmosOrderByItemQueryExecutionContext + { + /// + /// Gets the continuation token for an order by query. + /// + protected override string ContinuationToken + { + // In general the continuation token for order by queries contains the following information: + // 1) What partition did we leave off on + // 2) What value did we leave off + // Along with the constraints that we get from how we drain the documents: + // Let mean that the last item we drained was item x from partition y. + // Then we know that for all partitions + // * < y that we have drained all items <= x + // * > y that we have drained all items < x + // * = y that we have drained all items <= x based on the backend continuation token for y + // With this information we have captured the progress for all partitions in a single continuation token. + get + { + IEnumerable activeItemProducers = this.GetActiveItemProducers(); + string continuationToken; + if (activeItemProducers.Any()) + { + IEnumerable orderByContinuationTokens = activeItemProducers.Select((itemProducer) => + { + OrderByQueryResult orderByQueryResult = new OrderByQueryResult(itemProducer.Current); + string filter = itemProducer.Filter; + OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( + new CompositeContinuationToken + { + Token = itemProducer.PreviousContinuationToken, + Range = itemProducer.PartitionKeyRange.ToRange(), + }, + orderByQueryResult.OrderByItems, + orderByQueryResult.Rid, + this.ShouldIncrementSkipCount(itemProducer) ? this.skipCount + 1 : 0, + filter); + + return OrderByContinuationToken.ToCosmosElement(orderByContinuationToken); + }); + + continuationToken = CosmosArray.Create(orderByContinuationTokens).ToString(); + } + else + { + continuationToken = null; + } + + // Note we are no longer escaping non ascii continuation tokens. + // It is the callers job to encode a continuation token before adding it to a header in their service. + + return continuationToken; + } + } + + public override CosmosElement GetCosmosElementContinuationToken() + { + IEnumerable activeItemProducers = this.GetActiveItemProducers(); + if (!activeItemProducers.Any()) + { + return default; + } + + List orderByContinuationTokens = new List(); + foreach (ItemProducer activeItemProducer in activeItemProducers) + { + OrderByQueryResult orderByQueryResult = new OrderByQueryResult(activeItemProducer.Current); + OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( + compositeContinuationToken: new CompositeContinuationToken() + { + Token = activeItemProducer.PreviousContinuationToken, + Range = new Documents.Routing.Range( + min: activeItemProducer.PartitionKeyRange.MinInclusive, + max: activeItemProducer.PartitionKeyRange.MaxExclusive, + isMinInclusive: true, + isMaxInclusive: false) + }, + orderByItems: orderByQueryResult.OrderByItems, + rid: orderByQueryResult.Rid, + skipCount: this.ShouldIncrementSkipCount(activeItemProducer) ? this.skipCount + 1 : 0, + filter: activeItemProducer.Filter); + + CosmosElement cosmosElementToken = OrderByContinuationToken.ToCosmosElement(orderByContinuationToken); + orderByContinuationTokens.Add(cosmosElementToken); + } + + return CosmosArray.Create(orderByContinuationTokens); + } + + /// + /// Equality comparer used to determine if a document producer needs it's continuation token returned. + /// Basically just says that the continuation token can be flushed once you stop seeing duplicates. + /// + private sealed class OrderByEqualityComparer : IEqualityComparer + { + /// + /// The order by comparer. + /// + private readonly OrderByItemProducerTreeComparer orderByConsumeComparer; + + /// + /// Initializes a new instance of the OrderByEqualityComparer class. + /// + /// The order by consume comparer. + public OrderByEqualityComparer(OrderByItemProducerTreeComparer orderByConsumeComparer) + { + this.orderByConsumeComparer = orderByConsumeComparer ?? throw new ArgumentNullException($"{nameof(orderByConsumeComparer)} can not be null."); + } + + /// + /// Gets whether two OrderByQueryResult instances are equal. + /// + /// The first. + /// The second. + /// Whether two OrderByQueryResult instances are equal. + public bool Equals(CosmosElement x, CosmosElement y) + { + OrderByQueryResult orderByQueryResultX = new OrderByQueryResult(x); + OrderByQueryResult orderByQueryResultY = new OrderByQueryResult(y); + return this.orderByConsumeComparer.CompareOrderByItems( + orderByQueryResultX.OrderByItems, + orderByQueryResultY.OrderByItems) == 0; + } + + /// + /// Gets the hash code for object. + /// + /// The object to hash. + /// The hash code for the OrderByQueryResult object. + public int GetHashCode(CosmosElement obj) + { + return 0; + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Drain.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Drain.cs new file mode 100644 index 0000000000..b0c3a796b9 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Drain.cs @@ -0,0 +1,145 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + + internal sealed partial class CosmosOrderByItemQueryExecutionContext + { + /// + /// Drains a page of documents from this context. + /// + /// The maximum number of elements. + /// The cancellation token. + /// A task that when awaited on return a page of documents. + public override async Task DrainAsync(int maxElements, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + //// In order to maintain the continuation token for the user we must drain with a few constraints + //// 1) We always drain from the partition, which has the highest priority item first + //// 2) If multiple partitions have the same priority item then we drain from the left most first + //// otherwise we would need to keep track of how many of each item we drained from each partition + //// (just like parallel queries). + //// Visually that look the following case where we have three partitions that are numbered and store letters. + //// For teaching purposes I have made each item a tuple of the following form: + //// + //// So that duplicates across partitions are distinct, but duplicates within partitions are indistinguishable. + //// |-------| |-------| |-------| + //// | | | | | | + //// | | | | | | + //// | | | | | | + //// | | | | | | + //// | | | | | | + //// | | | | | | + //// | | | | | | + //// |-------| |-------| |-------| + //// Now the correct drain order in this case is: + //// ,,,,,,,,,,, + //// ,,,,,,,,, + //// In more mathematical terms + //// 1) always comes before where x < z + //// 2) always come before where j < k + + List results = new List(); + while (results.Count < maxElements) + { + // Only drain from the highest priority document producer + // We need to pop and push back the document producer tree, since the priority changes according to the sort order. + ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree(); + try + { + if (!currentItemProducerTree.HasMoreResults) + { + // This means there are no more items to drain + break; + } + + OrderByQueryResult orderByQueryResult = new OrderByQueryResult(currentItemProducerTree.Current); + + // Only add the payload, since other stuff is garbage from the caller's perspective. + results.Add(orderByQueryResult.Payload); + + // If we are at the beginning of the page and seeing an rid from the previous page we should increment the skip count + // due to the fact that JOINs can make a document appear multiple times and across continuations, so we don't want to + // surface this more than needed. More information can be found in the continuation token docs. + if (this.ShouldIncrementSkipCount(currentItemProducerTree.CurrentItemProducerTree.Root)) + { + ++this.skipCount; + } + else + { + this.skipCount = 0; + } + + this.previousRid = orderByQueryResult.Rid; + this.previousOrderByItems = orderByQueryResult.OrderByItems; + + if (!currentItemProducerTree.TryMoveNextDocumentWithinPage()) + { + while (true) + { + (bool movedToNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken); + if (!movedToNextPage) + { + if (failureResponse.HasValue) + { + // TODO: We can buffer this failure so that the user can still get the pages we already got. + return failureResponse.Value; + } + + break; + } + + if (currentItemProducerTree.IsAtBeginningOfPage) + { + break; + } + + if (currentItemProducerTree.TryMoveNextDocumentWithinPage()) + { + break; + } + } + } + } + finally + { + this.PushCurrentItemProducerTree(currentItemProducerTree); + } + } + + return QueryResponseCore.CreateSuccess( + result: results, + requestCharge: this.requestChargeTracker.GetAndResetCharge(), + activityId: null, + responseLengthBytes: this.GetAndResetResponseLengthBytes(), + disallowContinuationTokenMessage: null, + continuationToken: this.ContinuationToken, + diagnostics: this.GetAndResetDiagnostics()); + } + + /// + /// Gets whether or not we should increment the skip count based on the rid of the document. + /// + /// The current document producer. + /// Whether or not we should increment the skip count. + private bool ShouldIncrementSkipCount(ItemProducer currentItemProducer) + { + // If we are not at the beginning of the page and we saw the same rid again. + return !currentItemProducer.IsAtBeginningOfPage && + string.Equals( + this.previousRid, + new OrderByQueryResult(currentItemProducer.Current).Rid, + StringComparison.Ordinal); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Resume.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Resume.cs new file mode 100644 index 0000000000..9a80e95890 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Resume.cs @@ -0,0 +1,632 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; + using Microsoft.Azure.Cosmos.Query.Core.Exceptions; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + using PartitionKeyRange = Documents.PartitionKeyRange; + using ResourceId = Documents.ResourceId; + + internal sealed partial class CosmosOrderByItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext + { + private static class Expressions + { + public const string LessThan = "<"; + public const string LessThanOrEqualTo = "<="; + public const string EqualTo = "="; + public const string GreaterThan = ">"; + public const string GreaterThanOrEqualTo = ">="; + } + + public static async Task> TryCreateAsync( + CosmosQueryContext queryContext, + CosmosCrossPartitionQueryExecutionContext.CrossPartitionInitParams initParams, + CosmosElement requestContinuationToken, + CancellationToken cancellationToken) + { + Debug.Assert( + initParams.PartitionedQueryExecutionInfo.QueryInfo.HasOrderBy, + "OrderBy~Context must have order by query info."); + + if (queryContext == null) + { + throw new ArgumentNullException(nameof(queryContext)); + } + + cancellationToken.ThrowIfCancellationRequested(); + + // TODO (brchon): For now we are not honoring non deterministic ORDER BY queries, since there is a bug in the continuation logic. + // We can turn it back on once the bug is fixed. + // This shouldn't hurt any query results. + OrderByItemProducerTreeComparer orderByItemProducerTreeComparer = new OrderByItemProducerTreeComparer(initParams.PartitionedQueryExecutionInfo.QueryInfo.OrderBy.ToArray()); + CosmosOrderByItemQueryExecutionContext context = new CosmosOrderByItemQueryExecutionContext( + initPararms: queryContext, + maxConcurrency: initParams.MaxConcurrency, + maxItemCount: initParams.MaxItemCount, + maxBufferedItemCount: initParams.MaxBufferedItemCount, + consumeComparer: orderByItemProducerTreeComparer, + testSettings: initParams.TestSettings); + + IReadOnlyList orderByExpressions = initParams.PartitionedQueryExecutionInfo.QueryInfo.OrderByExpressions; + IReadOnlyList sortOrders = initParams.PartitionedQueryExecutionInfo.QueryInfo.OrderBy; + if (orderByExpressions.Count != sortOrders.Count) + { + throw new ArgumentException("order by expressions count does not match sort order"); + } + + IReadOnlyList columns = orderByExpressions + .Zip(sortOrders, (expression, order) => new OrderByColumn(expression, order)) + .ToList(); + + return (await context.TryInitializeAsync( + sqlQuerySpec: initParams.SqlQuerySpec, + requestContinuation: requestContinuationToken, + collectionRid: initParams.CollectionRid, + partitionKeyRanges: initParams.PartitionKeyRanges, + initialPageSize: initParams.InitialPageSize, + orderByColumns: columns, + cancellationToken: cancellationToken)) + .Try(() => context); + } + + private async Task TryInitializeAsync( + SqlQuerySpec sqlQuerySpec, + CosmosElement requestContinuation, + string collectionRid, + IReadOnlyList partitionKeyRanges, + int initialPageSize, + IReadOnlyList orderByColumns, + CancellationToken cancellationToken) + { + if (sqlQuerySpec == null) + { + throw new ArgumentNullException(nameof(sqlQuerySpec)); + } + + if (collectionRid == null) + { + throw new ArgumentNullException(nameof(collectionRid)); + } + + if (partitionKeyRanges == null) + { + throw new ArgumentNullException(nameof(partitionKeyRanges)); + } + + if (orderByColumns == null) + { + throw new ArgumentNullException(nameof(orderByColumns)); + } + + cancellationToken.ThrowIfCancellationRequested(); + + if (requestContinuation == null) + { + // Start off all the partition key ranges with null continuation + SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec( + sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: True), + sqlQuerySpec.Parameters); + Dictionary partitionKeyRangeToContinuationToken = new Dictionary(); + foreach (PartitionKeyRange partitionKeyRange in partitionKeyRanges) + { + partitionKeyRangeToContinuationToken.Add(key: partitionKeyRange, value: null); + } + + return await base.TryInitializeAsync( + collectionRid, + initialPageSize, + rewrittenQueryForOrderBy, + partitionKeyRangeToContinuationToken, + deferFirstPage: false, + filter: null, + tryFilterAsync: null, + cancellationToken); + } + + TryCatch> tryGetOrderByContinuationTokenMapping = TryGetOrderByContinuationTokenMapping( + partitionKeyRanges, + requestContinuation, + orderByColumns.Count); + if (!tryGetOrderByContinuationTokenMapping.Succeeded) + { + return TryCatch.FromException(tryGetOrderByContinuationTokenMapping.Exception); + } + + IReadOnlyList orderByItems = tryGetOrderByContinuationTokenMapping + .Result + .TargetPartition + .Values + .First() + .OrderByItems + .Select(x => x.Item) + .ToList(); + if (orderByItems.Count != orderByColumns.Count) + { + return TryCatch.FromException( + new MalformedContinuationTokenException($"Order By Items from continuation token did not match the query text. Order by item count: {orderByItems.Count()} did not match column count {orderByColumns.Count()}. Continuation token: {requestContinuation}")); + } + + ReadOnlyMemory<(OrderByColumn, CosmosElement)> columnAndItems = orderByColumns.Zip(orderByItems, (column, item) => (column, item)).ToArray(); + + // For ascending order-by, left of target partition has filter expression > value, + // right of target partition has filter expression >= value, + // and target partition takes the previous filter from continuation (or true if no continuation) + (string leftFilter, string targetFilter, string rightFilter) = CosmosOrderByItemQueryExecutionContext.GetFormattedFilters(columnAndItems); + List<(IReadOnlyDictionary, string)> tokenMappingAndFilters = new List<(IReadOnlyDictionary, string)>() + { + { (tryGetOrderByContinuationTokenMapping.Result.PartitionsLeftOfTarget, leftFilter) }, + { (tryGetOrderByContinuationTokenMapping.Result.TargetPartition, targetFilter) }, + { (tryGetOrderByContinuationTokenMapping.Result.PartitionsRightOfTarget, rightFilter) }, + }; + + IReadOnlyList sortOrders = orderByColumns.Select(column => column.SortOrder).ToList(); + foreach ((IReadOnlyDictionary tokenMapping, string filter) in tokenMappingAndFilters) + { + SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec( + sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: filter), + sqlQuerySpec.Parameters); + + TryCatch tryInitialize = await base.TryInitializeAsync( + collectionRid, + initialPageSize, + rewrittenQueryForOrderBy, + tokenMapping.ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.CompositeContinuationToken.Token), + deferFirstPage: false, + filter, + tryFilterAsync: async (itemProducerTree) => + { + if (!tokenMapping.TryGetValue( + itemProducerTree.Root.PartitionKeyRange, + out OrderByContinuationToken continuationToken)) + { + throw new InvalidOperationException($"Failed to retrieve {nameof(OrderByContinuationToken)}."); + } + + if (continuationToken == null) + { + return TryCatch.FromResult(); + } + + return await this.TryFilterAsync( + itemProducerTree, + sortOrders, + continuationToken, + cancellationToken); + }, + cancellationToken); + if (!tryInitialize.Succeeded) + { + return tryInitialize; + } + } + + return TryCatch.FromResult(); + } + + private static TryCatch> TryGetOrderByContinuationTokenMapping( + IReadOnlyList partitionKeyRanges, + CosmosElement continuationToken, + int numOrderByItems) + { + if (partitionKeyRanges == null) + { + throw new ArgumentOutOfRangeException(nameof(partitionKeyRanges)); + } + + if (numOrderByItems < 0) + { + throw new ArgumentOutOfRangeException(nameof(numOrderByItems)); + } + + if (continuationToken == null) + { + throw new ArgumentNullException(nameof(continuationToken)); + } + + TryCatch> tryExtractContinuationTokens = TryExtractContinuationTokens(continuationToken, numOrderByItems); + if (!tryExtractContinuationTokens.Succeeded) + { + return TryCatch>.FromException(tryExtractContinuationTokens.Exception); + } + + return TryGetInitializationInfo( + partitionKeyRanges, + tryExtractContinuationTokens.Result); + } + + private static TryCatch> TryExtractContinuationTokens( + CosmosElement requestContinuation, + int numOrderByItems) + { + if (requestContinuation == null) + { + throw new ArgumentNullException("continuation can not be null or empty."); + } + + if (numOrderByItems < 0) + { + throw new ArgumentOutOfRangeException(nameof(numOrderByItems)); + } + + if (!(requestContinuation is CosmosArray cosmosArray)) + { + return TryCatch>.FromException( + new MalformedContinuationTokenException( + $"Order by continuation token must be an array: {requestContinuation}.")); + } + + List orderByContinuationTokens = new List(); + foreach (CosmosElement arrayItem in cosmosArray) + { + TryCatch tryCreateOrderByContinuationToken = OrderByContinuationToken.TryCreateFromCosmosElement(arrayItem); + if (!tryCreateOrderByContinuationToken.Succeeded) + { + return TryCatch>.FromException(tryCreateOrderByContinuationToken.Exception); + } + + orderByContinuationTokens.Add(tryCreateOrderByContinuationToken.Result); + } + + if (orderByContinuationTokens.Count == 0) + { + return TryCatch>.FromException( + new MalformedContinuationTokenException( + $"Order by continuation token cannot be empty: {requestContinuation}.")); + } + + foreach (OrderByContinuationToken suppliedOrderByContinuationToken in orderByContinuationTokens) + { + if (suppliedOrderByContinuationToken.OrderByItems.Count != numOrderByItems) + { + return TryCatch>.FromException( + new MalformedContinuationTokenException( + $"Invalid order-by items in continuation token {requestContinuation} for OrderBy~Context.")); + } + } + + return TryCatch>.FromResult(orderByContinuationTokens); + } + + /// + /// When resuming an order by query we need to filter the document producers. + /// + /// The producer to filter down. + /// The sort orders. + /// The continuation token. + /// The cancellation token. + /// A task to await on. + private async Task TryFilterAsync( + ItemProducerTree producer, + IReadOnlyList sortOrders, + OrderByContinuationToken continuationToken, + CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + // When we resume a query on a partition there is a possibility that we only read a partial page from the backend + // meaning that will we repeat some documents if we didn't do anything about it. + // The solution is to filter all the documents that come before in the sort order, since we have already emitted them to the client. + // The key is to seek until we get an order by value that matches the order by value we left off on. + // Once we do that we need to seek to the correct _rid within the term, + // since there might be many documents with the same order by value we left off on. + + foreach (ItemProducerTree tree in producer) + { + if (!ResourceId.TryParse(continuationToken.Rid, out ResourceId continuationRid)) + { + return TryCatch.FromException( + new MalformedContinuationTokenException( + $"Invalid Rid in the continuation token {continuationToken.CompositeContinuationToken.Token} for OrderBy~Context.")); + } + + Dictionary resourceIds = new Dictionary(); + int itemToSkip = continuationToken.SkipCount; + bool continuationRidVerified = false; + + while (true) + { + if (tree.Current == null) + { + // This document producer doesn't have anymore items. + break; + } + + OrderByQueryResult orderByResult = new OrderByQueryResult(tree.Current); + // Throw away documents until it matches the item from the continuation token. + int cmp = 0; + for (int i = 0; i < sortOrders.Count; ++i) + { + cmp = ItemComparer.Instance.Compare( + continuationToken.OrderByItems[i].Item, + orderByResult.OrderByItems[i].Item); + + if (cmp != 0) + { + cmp = sortOrders[i] == SortOrder.Ascending ? cmp : -cmp; + break; + } + } + + if (cmp < 0) + { + // We might have passed the item due to deletions and filters. + break; + } + + if (cmp == 0) + { + if (!resourceIds.TryGetValue(orderByResult.Rid, out ResourceId rid)) + { + if (!ResourceId.TryParse(orderByResult.Rid, out rid)) + { + return TryCatch.FromException( + new MalformedContinuationTokenException( + $"Invalid Rid in the continuation token {continuationToken.CompositeContinuationToken.Token} for OrderBy~Context~TryParse.")); + } + + resourceIds.Add(orderByResult.Rid, rid); + } + + if (!continuationRidVerified) + { + if (continuationRid.Database != rid.Database || continuationRid.DocumentCollection != rid.DocumentCollection) + { + return TryCatch.FromException( + new MalformedContinuationTokenException( + $"Invalid Rid in the continuation token {continuationToken.CompositeContinuationToken.Token} for OrderBy~Context.")); + } + + continuationRidVerified = true; + } + + // Once the item matches the order by items from the continuation tokens + // We still need to remove all the documents that have a lower rid in the rid sort order. + // If there is a tie in the sort order the documents should be in _rid order in the same direction as the first order by field. + // So if it's ORDER BY c.age ASC, c.name DESC the _rids are ASC + // If ti's ORDER BY c.age DESC, c.name DESC the _rids are DESC + cmp = continuationRid.Document.CompareTo(rid.Document); + if (sortOrders[0] == SortOrder.Descending) + { + cmp = -cmp; + } + + // We might have passed the item due to deletions and filters. + // We also have a skip count for JOINs + if (cmp < 0 || (cmp == 0 && itemToSkip-- <= 0)) + { + break; + } + } + + if (!tree.TryMoveNextDocumentWithinPage()) + { + while (true) + { + (bool successfullyMovedNext, QueryResponseCore? failureResponse) = await tree.TryMoveNextPageAsync(cancellationToken); + if (!successfullyMovedNext) + { + if (failureResponse.HasValue) + { + return TryCatch.FromException( + failureResponse.Value.CosmosException); + } + + break; + } + + if (tree.IsAtBeginningOfPage) + { + break; + } + + if (tree.TryMoveNextDocumentWithinPage()) + { + break; + } + } + } + } + } + + return TryCatch.FromResult(); + } + + private static void AppendToBuilders((StringBuilder leftFilter, StringBuilder targetFilter, StringBuilder rightFilter) builders, object str) + { + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, str, str, str); + } + + private static void AppendToBuilders((StringBuilder leftFilter, StringBuilder targetFilter, StringBuilder rightFilter) builders, object left, object target, object right) + { + builders.leftFilter.Append(left); + builders.targetFilter.Append(target); + builders.rightFilter.Append(right); + } + + private static (string leftFilter, string targetFilter, string rightFilter) GetFormattedFilters( + ReadOnlyMemory<(OrderByColumn orderByColumn, CosmosElement orderByItem)> columnAndItems) + { + // When we run cross partition queries, + // we only serialize the continuation token for the partition that we left off on. + // The only problem is that when we resume the order by query, + // we don't have continuation tokens for all other partition. + // The saving grace is that the data has a composite sort order(query sort order, partition key range id) + // so we can generate range filters which in turn the backend will turn into rid based continuation tokens, + // which is enough to get the streams of data flowing from all partitions. + // The details of how this is done is described below: + int numOrderByItems = columnAndItems.Length; + bool isSingleOrderBy = numOrderByItems == 1; + StringBuilder left = new StringBuilder(); + StringBuilder target = new StringBuilder(); + StringBuilder right = new StringBuilder(); + + (StringBuilder, StringBuilder, StringBuilder) builders = (left, target, right); + + if (isSingleOrderBy) + { + //For a single order by query we resume the continuations in this manner + // Suppose the query is SELECT* FROM c ORDER BY c.string ASC + // And we left off on partition N with the value "B" + // Then + // All the partitions to the left will have finished reading "B" + // Partition N is still reading "B" + // All the partitions to the right have let to read a "B + // Therefore the filters should be + // > "B" , >= "B", and >= "B" respectively + // Repeat the same logic for DESC and you will get + // < "B", <= "B", and <= "B" respectively + // The general rule becomes + // For ASC + // > for partitions to the left + // >= for the partition we left off on + // >= for the partitions to the right + // For DESC + // < for partitions to the left + // <= for the partition we left off on + // <= for the partitions to the right + (OrderByColumn orderByColumn, CosmosElement orderByItem) = columnAndItems.Span[0]; + (string expression, SortOrder sortOrder) = (orderByColumn.Expression, orderByColumn.SortOrder); + + StringBuilder sb = new StringBuilder(); + CosmosElementToQueryLiteral cosmosElementToQueryLiteral = new CosmosElementToQueryLiteral(sb); + orderByItem.Accept(cosmosElementToQueryLiteral); + + string orderByItemToString = sb.ToString(); + + left.Append($"{expression} {(sortOrder == SortOrder.Descending ? Expressions.LessThan : Expressions.GreaterThan)} {orderByItemToString}"); + target.Append($"{expression} {(sortOrder == SortOrder.Descending ? Expressions.LessThanOrEqualTo : Expressions.GreaterThanOrEqualTo)} {orderByItemToString}"); + right.Append($"{expression} {(sortOrder == SortOrder.Descending ? Expressions.LessThanOrEqualTo : Expressions.GreaterThanOrEqualTo)} {orderByItemToString}"); + } + else + { + //For a multi order by query + // Suppose the query is SELECT* FROM c ORDER BY c.string ASC, c.number ASC + // And we left off on partition N with the value("A", 1) + // Then + // All the partitions to the left will have finished reading("A", 1) + // Partition N is still reading("A", 1) + // All the partitions to the right have let to read a "(A", 1) + // The filters are harder to derive since their are multiple columns + // But the problem reduces to "How do you know one document comes after another in a multi order by query" + // The answer is to just look at it one column at a time. + // For this particular scenario: + // If a first column is greater ex. ("B", blah), then the document comes later in the sort order + // Therefore we want all documents where the first column is greater than "A" which means > "A" + // Or if the first column is a tie, then you look at the second column ex. ("A", blah). + // Therefore we also want all documents where the first column was a tie but the second column is greater which means = "A" AND > 1 + // Therefore the filters should be + // (> "A") OR (= "A" AND > 1), (> "A") OR (= "A" AND >= 1), (> "A") OR (= "A" AND >= 1) + // Notice that if we repeated the same logic we for single order by we would have gotten + // > "A" AND > 1, >= "A" AND >= 1, >= "A" AND >= 1 + // which is wrong since we missed some documents + // Repeat the same logic for ASC, DESC + // (> "A") OR (= "A" AND < 1), (> "A") OR (= "A" AND <= 1), (> "A") OR (= "A" AND <= 1) + // Again for DESC, ASC + // (< "A") OR (= "A" AND > 1), (< "A") OR (= "A" AND >= 1), (< "A") OR (= "A" AND >= 1) + // And again for DESC DESC + // (< "A") OR (= "A" AND < 1), (< "A") OR (= "A" AND <= 1), (< "A") OR (= "A" AND <= 1) + // The general we look at all prefixes of the order by columns to look for tie breakers. + // Except for the full prefix whose last column follows the rules for single item order by + // And then you just OR all the possibilities together + for (int prefixLength = 1; prefixLength <= numOrderByItems; prefixLength++) + { + ReadOnlySpan<(OrderByColumn orderByColumn, CosmosElement orderByItem)> columnAndItemPrefix = columnAndItems.Span.Slice(start: 0, length: prefixLength); + + bool lastPrefix = prefixLength == numOrderByItems; + bool firstPrefix = prefixLength == 1; + + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, "("); + + for (int index = 0; index < prefixLength; index++) + { + string expression = columnAndItemPrefix[index].orderByColumn.Expression; + SortOrder sortOrder = columnAndItemPrefix[index].orderByColumn.SortOrder; + CosmosElement orderByItem = columnAndItemPrefix[index].orderByItem; + bool lastItem = index == prefixLength - 1; + + // Append Expression + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, expression); + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " "); + + // Append binary operator + if (lastItem) + { + string inequality = sortOrder == SortOrder.Descending ? Expressions.LessThan : Expressions.GreaterThan; + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, inequality); + if (lastPrefix) + { + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, string.Empty, Expressions.EqualTo, Expressions.EqualTo); + } + } + else + { + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, Expressions.EqualTo); + } + + // Append SortOrder + StringBuilder sb = new StringBuilder(); + CosmosElementToQueryLiteral cosmosElementToQueryLiteral = new CosmosElementToQueryLiteral(sb); + orderByItem.Accept(cosmosElementToQueryLiteral); + string orderByItemToString = sb.ToString(); + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " "); + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, orderByItemToString); + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " "); + + if (!lastItem) + { + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, "AND "); + } + } + + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, ")"); + if (!lastPrefix) + { + CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " OR "); + } + } + } + + return (left.ToString(), target.ToString(), right.ToString()); + } + + private readonly struct OrderByInitInfo + { + public OrderByInitInfo( + RangeFilterInitializationInfo[] filters, + IReadOnlyDictionary continuationTokens) + { + this.Filters = filters; + this.ContinuationTokens = continuationTokens; + } + + public RangeFilterInitializationInfo[] Filters { get; } + + public IReadOnlyDictionary ContinuationTokens { get; } + } + + private readonly struct OrderByColumn + { + public OrderByColumn(string expression, SortOrder sortOrder) + { + this.Expression = expression ?? throw new ArgumentNullException(nameof(expression)); + this.SortOrder = sortOrder; + } + + public string Expression { get; } + public SortOrder SortOrder { get; } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.cs index 6c534ec9bd..8debd89d5a 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.cs @@ -1,26 +1,14 @@ -//------------------------------------------------------------ +// ------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. -//------------------------------------------------------------ +// ------------------------------------------------------------ + namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy { using System; using System.Collections.Generic; - using System.Diagnostics; - using System.Linq; - using System.Text; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Query.Core; - using Microsoft.Azure.Cosmos.Query.Core.Collections; - using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; - using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.QueryClient; - using PartitionKeyRange = Documents.PartitionKeyRange; - using ResourceId = Documents.ResourceId; /// /// CosmosOrderByItemQueryExecutionContext is a concrete implementation for CrossPartitionQueryExecutionContext. @@ -30,7 +18,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy /// This way we can generate a single continuation token for all n partitions. /// This class is able to stop and resume execution by generating continuation tokens and reconstructing an execution context from said token. /// - internal sealed class CosmosOrderByItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext + internal sealed partial class CosmosOrderByItemQueryExecutionContext { /// /// Order by queries are rewritten to allow us to inject a filter. @@ -99,930 +87,5 @@ private CosmosOrderByItemQueryExecutionContext( testSettings: testSettings) { } - - /// - /// Gets the continuation token for an order by query. - /// - protected override string ContinuationToken - { - // In general the continuation token for order by queries contains the following information: - // 1) What partition did we leave off on - // 2) What value did we leave off - // Along with the constraints that we get from how we drain the documents: - // Let mean that the last item we drained was item x from partition y. - // Then we know that for all partitions - // * < y that we have drained all items <= x - // * > y that we have drained all items < x - // * = y that we have drained all items <= x based on the backend continuation token for y - // With this information we have captured the progress for all partitions in a single continuation token. - get - { - IEnumerable activeItemProducers = this.GetActiveItemProducers(); - string continuationToken; - if (activeItemProducers.Any()) - { - IEnumerable orderByContinuationTokens = activeItemProducers.Select((itemProducer) => - { - OrderByQueryResult orderByQueryResult = new OrderByQueryResult(itemProducer.Current); - string filter = itemProducer.Filter; - OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( - new CompositeContinuationToken - { - Token = itemProducer.PreviousContinuationToken, - Range = itemProducer.PartitionKeyRange.ToRange(), - }, - orderByQueryResult.OrderByItems, - orderByQueryResult.Rid, - this.ShouldIncrementSkipCount(itemProducer) ? this.skipCount + 1 : 0, - filter); - - return OrderByContinuationToken.ToCosmosElement(orderByContinuationToken); - }); - - continuationToken = CosmosArray.Create(orderByContinuationTokens).ToString(); - } - else - { - continuationToken = null; - } - - // Note we are no longer escaping non ascii continuation tokens. - // It is the callers job to encode a continuation token before adding it to a header in their service. - - return continuationToken; - } - } - - public static async Task> TryCreateAsync( - CosmosQueryContext queryContext, - CosmosCrossPartitionQueryExecutionContext.CrossPartitionInitParams initParams, - CosmosElement requestContinuationToken, - CancellationToken cancellationToken) - { - Debug.Assert( - initParams.PartitionedQueryExecutionInfo.QueryInfo.HasOrderBy, - "OrderBy~Context must have order by query info."); - - if (queryContext == null) - { - throw new ArgumentNullException(nameof(queryContext)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - // TODO (brchon): For now we are not honoring non deterministic ORDER BY queries, since there is a bug in the continuation logic. - // We can turn it back on once the bug is fixed. - // This shouldn't hurt any query results. - OrderByItemProducerTreeComparer orderByItemProducerTreeComparer = new OrderByItemProducerTreeComparer(initParams.PartitionedQueryExecutionInfo.QueryInfo.OrderBy.ToArray()); - CosmosOrderByItemQueryExecutionContext context = new CosmosOrderByItemQueryExecutionContext( - initPararms: queryContext, - maxConcurrency: initParams.MaxConcurrency, - maxItemCount: initParams.MaxItemCount, - maxBufferedItemCount: initParams.MaxBufferedItemCount, - consumeComparer: orderByItemProducerTreeComparer, - testSettings: initParams.TestSettings); - - return (await context.TryInitializeAsync( - sqlQuerySpec: initParams.SqlQuerySpec, - requestContinuation: requestContinuationToken, - collectionRid: initParams.CollectionRid, - partitionKeyRanges: initParams.PartitionKeyRanges, - initialPageSize: initParams.InitialPageSize, - sortOrders: initParams.PartitionedQueryExecutionInfo.QueryInfo.OrderBy.ToArray(), - orderByExpressions: initParams.PartitionedQueryExecutionInfo.QueryInfo.OrderByExpressions.ToArray(), - cancellationToken: cancellationToken)) - .Try(() => context); - } - - /// - /// Drains a page of documents from this context. - /// - /// The maximum number of elements. - /// The cancellation token. - /// A task that when awaited on return a page of documents. - public override async Task DrainAsync(int maxElements, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - //// In order to maintain the continuation token for the user we must drain with a few constraints - //// 1) We always drain from the partition, which has the highest priority item first - //// 2) If multiple partitions have the same priority item then we drain from the left most first - //// otherwise we would need to keep track of how many of each item we drained from each partition - //// (just like parallel queries). - //// Visually that look the following case where we have three partitions that are numbered and store letters. - //// For teaching purposes I have made each item a tuple of the following form: - //// - //// So that duplicates across partitions are distinct, but duplicates within partitions are indistinguishable. - //// |-------| |-------| |-------| - //// | | | | | | - //// | | | | | | - //// | | | | | | - //// | | | | | | - //// | | | | | | - //// | | | | | | - //// | | | | | | - //// |-------| |-------| |-------| - //// Now the correct drain order in this case is: - //// ,,,,,,,,,,, - //// ,,,,,,,,, - //// In more mathematical terms - //// 1) always comes before where x < z - //// 2) always come before where j < k - - List results = new List(); - while (results.Count < maxElements) - { - // Only drain from the highest priority document producer - // We need to pop and push back the document producer tree, since the priority changes according to the sort order. - ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree(); - try - { - if (!currentItemProducerTree.HasMoreResults) - { - // This means there are no more items to drain - break; - } - - OrderByQueryResult orderByQueryResult = new OrderByQueryResult(currentItemProducerTree.Current); - - // Only add the payload, since other stuff is garbage from the caller's perspective. - results.Add(orderByQueryResult.Payload); - - // If we are at the beginning of the page and seeing an rid from the previous page we should increment the skip count - // due to the fact that JOINs can make a document appear multiple times and across continuations, so we don't want to - // surface this more than needed. More information can be found in the continuation token docs. - if (this.ShouldIncrementSkipCount(currentItemProducerTree.CurrentItemProducerTree.Root)) - { - ++this.skipCount; - } - else - { - this.skipCount = 0; - } - - this.previousRid = orderByQueryResult.Rid; - this.previousOrderByItems = orderByQueryResult.OrderByItems; - - if (!currentItemProducerTree.TryMoveNextDocumentWithinPage()) - { - while (true) - { - (bool movedToNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken); - if (!movedToNextPage) - { - if (failureResponse.HasValue) - { - // TODO: We can buffer this failure so that the user can still get the pages we already got. - return failureResponse.Value; - } - - break; - } - - if (currentItemProducerTree.IsAtBeginningOfPage) - { - break; - } - - if (currentItemProducerTree.TryMoveNextDocumentWithinPage()) - { - break; - } - } - } - } - finally - { - this.PushCurrentItemProducerTree(currentItemProducerTree); - } - } - - return QueryResponseCore.CreateSuccess( - result: results, - requestCharge: this.requestChargeTracker.GetAndResetCharge(), - activityId: null, - responseLengthBytes: this.GetAndResetResponseLengthBytes(), - disallowContinuationTokenMessage: null, - continuationToken: this.ContinuationToken, - diagnostics: this.GetAndResetDiagnostics()); - } - - /// - /// Gets whether or not we should increment the skip count based on the rid of the document. - /// - /// The current document producer. - /// Whether or not we should increment the skip count. - private bool ShouldIncrementSkipCount(ItemProducer currentItemProducer) - { - // If we are not at the beginning of the page and we saw the same rid again. - return !currentItemProducer.IsAtBeginningOfPage && - string.Equals( - this.previousRid, - new OrderByQueryResult(currentItemProducer.Current).Rid, - StringComparison.Ordinal); - } - - private async Task TryInitializeAsync( - SqlQuerySpec sqlQuerySpec, - CosmosElement requestContinuation, - string collectionRid, - List partitionKeyRanges, - int initialPageSize, - SortOrder[] sortOrders, - string[] orderByExpressions, - CancellationToken cancellationToken) - { - if (sqlQuerySpec == null) - { - throw new ArgumentNullException(nameof(sqlQuerySpec)); - } - - if (collectionRid == null) - { - throw new ArgumentNullException(nameof(collectionRid)); - } - - if (partitionKeyRanges == null) - { - throw new ArgumentNullException(nameof(partitionKeyRanges)); - } - - if (sortOrders == null) - { - throw new ArgumentNullException(nameof(sortOrders)); - } - - if (orderByExpressions == null) - { - throw new ArgumentNullException(nameof(orderByExpressions)); - } - - cancellationToken.ThrowIfCancellationRequested(); - - if (requestContinuation == null) - { - SqlQuerySpec sqlQuerySpecForInit = new SqlQuerySpec( - sqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: True), - sqlQuerySpec.Parameters); - - TryCatch tryInitialize = await base.TryInitializeAsync( - collectionRid, - partitionKeyRanges, - initialPageSize, - sqlQuerySpecForInit, - cancellationToken: cancellationToken, - targetRangeToContinuationMap: null, - deferFirstPage: false, - filter: null, - tryFilterAsync: null); - if (!tryInitialize.Succeeded) - { - return tryInitialize; - } - } - else - { - TryCatch tryExtractContinuationTokens = CosmosOrderByItemQueryExecutionContext.TryExtractContinuationTokens( - requestContinuation, - sortOrders, - orderByExpressions); - if (!tryExtractContinuationTokens.Succeeded) - { - return TryCatch.FromException(tryExtractContinuationTokens.Exception); - } - - TryCatch tryGetOrderByInitInfo = CosmosOrderByItemQueryExecutionContext.TryGetOrderByPartitionKeyRangesInitializationInfo( - tryExtractContinuationTokens.Result, - partitionKeyRanges, - sortOrders, - orderByExpressions); - if (!tryGetOrderByInitInfo.Succeeded) - { - return TryCatch.FromException(tryGetOrderByInitInfo.Exception); - } - - OrderByInitInfo initiaizationInfo = tryGetOrderByInitInfo.Result; - RangeFilterInitializationInfo[] orderByInfos = initiaizationInfo.Filters; - IReadOnlyDictionary targetRangeToOrderByContinuationMap = initiaizationInfo.ContinuationTokens; - Debug.Assert( - targetRangeToOrderByContinuationMap != null, - "If targetRangeToOrderByContinuationMap can't be null is valid continuation is supplied"); - - // For ascending order-by, left of target partition has filter expression > value, - // right of target partition has filter expression >= value, - // and target partition takes the previous filter from continuation (or true if no continuation) - foreach (RangeFilterInitializationInfo info in orderByInfos) - { - if (info.StartIndex > info.EndIndex) - { - continue; - } - - PartialReadOnlyList partialRanges = - new PartialReadOnlyList( - partitionKeyRanges, - info.StartIndex, - info.EndIndex - info.StartIndex + 1); - - SqlQuerySpec sqlQuerySpecForInit = new SqlQuerySpec( - sqlQuerySpec.QueryText.Replace(FormatPlaceHolder, info.Filter), - sqlQuerySpec.Parameters); - - TryCatch tryInitialize = await base.TryInitializeAsync( - collectionRid, - partialRanges, - initialPageSize, - sqlQuerySpecForInit, - targetRangeToOrderByContinuationMap.ToDictionary( - kvp => kvp.Key, - kvp => kvp.Value.CompositeContinuationToken.Token), - false, - info.Filter, - async (itemProducerTree) => - { - if (targetRangeToOrderByContinuationMap.TryGetValue( - itemProducerTree.Root.PartitionKeyRange.Id, - out OrderByContinuationToken continuationToken)) - { - TryCatch tryFilter = await this.TryFilterAsync( - itemProducerTree, - sortOrders, - continuationToken, - cancellationToken); - - if (!tryFilter.Succeeded) - { - return tryFilter; - } - } - - return TryCatch.FromResult(); - }, - cancellationToken); - if (!tryInitialize.Succeeded) - { - return tryInitialize; - } - } - } - - return TryCatch.FromResult(); - } - - private static TryCatch TryExtractContinuationTokens( - CosmosElement requestContinuation, - SortOrder[] sortOrders, - string[] orderByExpressions) - { - Debug.Assert( - !(orderByExpressions == null - || orderByExpressions.Length <= 0 - || sortOrders == null - || sortOrders.Length <= 0 - || orderByExpressions.Length != sortOrders.Length), - "Partitioned QueryExecutionInfo returned bogus results."); - - if (requestContinuation == null) - { - throw new ArgumentNullException("continuation can not be null or empty."); - } - - if (!(requestContinuation is CosmosArray cosmosArray)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException($"Order by continuation token must be an array: {requestContinuation}.")); - } - - List orderByContinuationTokens = new List(); - foreach (CosmosElement arrayItem in cosmosArray) - { - TryCatch tryCreateOrderByContinuationToken = OrderByContinuationToken.TryCreateFromCosmosElement(arrayItem); - if (!tryCreateOrderByContinuationToken.Succeeded) - { - return TryCatch.FromException(tryCreateOrderByContinuationToken.Exception); - } - - orderByContinuationTokens.Add(tryCreateOrderByContinuationToken.Result); - } - - if (orderByContinuationTokens.Count == 0) - { - return TryCatch.FromException( - new MalformedContinuationTokenException($"Order by continuation token cannot be empty: {requestContinuation}.")); - } - - foreach (OrderByContinuationToken suppliedOrderByContinuationToken in orderByContinuationTokens) - { - if (suppliedOrderByContinuationToken.OrderByItems.Count != sortOrders.Length) - { - return TryCatch.FromException( - new MalformedContinuationTokenException($"Invalid order-by items in continuation token {requestContinuation} for OrderBy~Context.")); - } - } - - return TryCatch.FromResult(orderByContinuationTokens.ToArray()); - } - - /// - /// When resuming an order by query we need to filter the document producers. - /// - /// The producer to filter down. - /// The sort orders. - /// The continuation token. - /// The cancellation token. - /// A task to await on. - private async Task TryFilterAsync( - ItemProducerTree producer, - SortOrder[] sortOrders, - OrderByContinuationToken continuationToken, - CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - // When we resume a query on a partition there is a possibility that we only read a partial page from the backend - // meaning that will we repeat some documents if we didn't do anything about it. - // The solution is to filter all the documents that come before in the sort order, since we have already emitted them to the client. - // The key is to seek until we get an order by value that matches the order by value we left off on. - // Once we do that we need to seek to the correct _rid within the term, - // since there might be many documents with the same order by value we left off on. - - foreach (ItemProducerTree tree in producer) - { - if (!ResourceId.TryParse(continuationToken.Rid, out ResourceId continuationRid)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException($"Invalid Rid in the continuation token {continuationToken.CompositeContinuationToken.Token} for OrderBy~Context.")); - } - - Dictionary resourceIds = new Dictionary(); - int itemToSkip = continuationToken.SkipCount; - bool continuationRidVerified = false; - - while (true) - { - if (tree.Current == null) - { - // This document producer doesn't have anymore items. - break; - } - - OrderByQueryResult orderByResult = new OrderByQueryResult(tree.Current); - // Throw away documents until it matches the item from the continuation token. - int cmp = 0; - for (int i = 0; i < sortOrders.Length; ++i) - { - cmp = ItemComparer.Instance.Compare( - continuationToken.OrderByItems[i].Item, - orderByResult.OrderByItems[i].Item); - - if (cmp != 0) - { - cmp = sortOrders[i] != SortOrder.Descending ? cmp : -cmp; - break; - } - } - - if (cmp < 0) - { - // We might have passed the item due to deletions and filters. - break; - } - - if (cmp == 0) - { - if (!resourceIds.TryGetValue(orderByResult.Rid, out ResourceId rid)) - { - if (!ResourceId.TryParse(orderByResult.Rid, out rid)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Invalid Rid in the continuation token {continuationToken.CompositeContinuationToken.Token} for OrderBy~Context~TryParse.")); - } - - resourceIds.Add(orderByResult.Rid, rid); - } - - if (!continuationRidVerified) - { - if (continuationRid.Database != rid.Database || continuationRid.DocumentCollection != rid.DocumentCollection) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Invalid Rid in the continuation token {continuationToken.CompositeContinuationToken.Token} for OrderBy~Context.")); - } - - continuationRidVerified = true; - } - - // Once the item matches the order by items from the continuation tokens - // We still need to remove all the documents that have a lower rid in the rid sort order. - // If there is a tie in the sort order the documents should be in _rid order in the same direction as the first order by field. - // So if it's ORDER BY c.age ASC, c.name DESC the _rids are ASC - // If ti's ORDER BY c.age DESC, c.name DESC the _rids are DESC - cmp = continuationRid.Document.CompareTo(rid.Document); - if (sortOrders[0] == SortOrder.Descending) - { - cmp = -cmp; - } - - // We might have passed the item due to deletions and filters. - // We also have a skip count for JOINs - if (cmp < 0 || (cmp == 0 && itemToSkip-- <= 0)) - { - break; - } - } - - if (!tree.TryMoveNextDocumentWithinPage()) - { - while (true) - { - (bool successfullyMovedNext, QueryResponseCore? failureResponse) = await tree.TryMoveNextPageAsync(cancellationToken); - if (!successfullyMovedNext) - { - if (failureResponse.HasValue) - { - return TryCatch.FromException( - failureResponse.Value.CosmosException); - } - - break; - } - - if (tree.IsAtBeginningOfPage) - { - break; - } - - if (tree.TryMoveNextDocumentWithinPage()) - { - break; - } - } - } - } - } - - return TryCatch.FromResult(); - } - - /// - /// Gets the filters for every partition. - /// - private static TryCatch TryGetOrderByPartitionKeyRangesInitializationInfo( - OrderByContinuationToken[] suppliedContinuationTokens, - List partitionKeyRanges, - SortOrder[] sortOrders, - string[] orderByExpressions) - { - TryCatch> tryFindRangeAndContinuationTokensMonad = CosmosCrossPartitionQueryExecutionContext.TryFindTargetRangeAndExtractContinuationTokens( - partitionKeyRanges, - suppliedContinuationTokens - .Select(token => Tuple.Create(token, token.CompositeContinuationToken.Range))); - - return tryFindRangeAndContinuationTokensMonad.Try((indexAndContinuationTokens) => - { - int minIndex = indexAndContinuationTokens.TargetIndex; - IReadOnlyDictionary partitionKeyRangeToContinuationToken = indexAndContinuationTokens.ContinuationTokens; - - FormattedFilterInfo formattedFilterInfo = CosmosOrderByItemQueryExecutionContext.GetFormattedFilters( - orderByExpressions, - suppliedContinuationTokens, - sortOrders); - - RangeFilterInitializationInfo[] filters = new RangeFilterInitializationInfo[] - { - new RangeFilterInitializationInfo( - filter: formattedFilterInfo.FilterForRangesLeftOfTargetRanges, - startIndex: 0, - endIndex: minIndex - 1), - new RangeFilterInitializationInfo( - filter: formattedFilterInfo.FiltersForTargetRange, - startIndex: minIndex, - endIndex: minIndex), - new RangeFilterInitializationInfo( - filter: formattedFilterInfo.FilterForRangesRightOfTargetRanges, - startIndex: minIndex + 1, - endIndex: partitionKeyRanges.Count - 1), - }; - - return new OrderByInitInfo( - filters, - partitionKeyRangeToContinuationToken); - }); - } - - /// - /// Gets the formatted filters for every partition. - /// - /// The filter expressions. - /// The continuation token. - /// The sort orders. - /// The formatted filters for every partition. - private static FormattedFilterInfo GetFormattedFilters( - string[] expressions, - OrderByContinuationToken[] continuationTokens, - SortOrder[] sortOrders) - { - // Validate the inputs - for (int index = 0; index < continuationTokens.Length; index++) - { - Debug.Assert(continuationTokens[index].OrderByItems.Count == sortOrders.Length, "Expect values and orders are the same size."); - Debug.Assert(expressions.Length == sortOrders.Length, "Expect expressions and orders are the same size."); - } - - Tuple filters = CosmosOrderByItemQueryExecutionContext.GetFormattedFilters( - expressions, - continuationTokens[0].OrderByItems.Select(orderByItem => orderByItem.Item).ToArray(), - sortOrders); - - return new FormattedFilterInfo(filters.Item1, filters.Item2, filters.Item3); - } - - private static void AppendToBuilders(Tuple builders, object str) - { - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, str, str, str); - } - - private static void AppendToBuilders(Tuple builders, object left, object target, object right) - { - builders.Item1.Append(left); - builders.Item2.Append(target); - builders.Item3.Append(right); - } - - private static Tuple GetFormattedFilters( - string[] expressions, - CosmosElement[] orderByItems, - SortOrder[] sortOrders) - { - // When we run cross partition queries, - // we only serialize the continuation token for the partition that we left off on. - // The only problem is that when we resume the order by query, - // we don't have continuation tokens for all other partition. - // The saving grace is that the data has a composite sort order(query sort order, partition key range id) - // so we can generate range filters which in turn the backend will turn into rid based continuation tokens, - // which is enough to get the streams of data flowing from all partitions. - // The details of how this is done is described below: - int numOrderByItems = expressions.Length; - bool isSingleOrderBy = numOrderByItems == 1; - StringBuilder left = new StringBuilder(); - StringBuilder target = new StringBuilder(); - StringBuilder right = new StringBuilder(); - - Tuple builders = new Tuple(left, right, target); - - if (isSingleOrderBy) - { - //For a single order by query we resume the continuations in this manner - // Suppose the query is SELECT* FROM c ORDER BY c.string ASC - // And we left off on partition N with the value "B" - // Then - // All the partitions to the left will have finished reading "B" - // Partition N is still reading "B" - // All the partitions to the right have let to read a "B - // Therefore the filters should be - // > "B" , >= "B", and >= "B" respectively - // Repeat the same logic for DESC and you will get - // < "B", <= "B", and <= "B" respectively - // The general rule becomes - // For ASC - // > for partitions to the left - // >= for the partition we left off on - // >= for the partitions to the right - // For DESC - // < for partitions to the left - // <= for the partition we left off on - // <= for the partitions to the right - string expression = expressions.First(); - SortOrder sortOrder = sortOrders.First(); - CosmosElement orderByItem = orderByItems.First(); - StringBuilder sb = new StringBuilder(); - CosmosElementToQueryLiteral cosmosElementToQueryLiteral = new CosmosElementToQueryLiteral(sb); - orderByItem.Accept(cosmosElementToQueryLiteral); - string orderByItemToString = sb.ToString(); - left.Append($"{expression} {(sortOrder == SortOrder.Descending ? "<" : ">")} {orderByItemToString}"); - target.Append($"{expression} {(sortOrder == SortOrder.Descending ? "<=" : ">=")} {orderByItemToString}"); - right.Append($"{expression} {(sortOrder == SortOrder.Descending ? "<=" : ">=")} {orderByItemToString}"); - } - else - { - //For a multi order by query - // Suppose the query is SELECT* FROM c ORDER BY c.string ASC, c.number ASC - // And we left off on partition N with the value("A", 1) - // Then - // All the partitions to the left will have finished reading("A", 1) - // Partition N is still reading("A", 1) - // All the partitions to the right have let to read a "(A", 1) - // The filters are harder to derive since their are multiple columns - // But the problem reduces to "How do you know one document comes after another in a multi order by query" - // The answer is to just look at it one column at a time. - // For this particular scenario: - // If a first column is greater ex. ("B", blah), then the document comes later in the sort order - // Therefore we want all documents where the first column is greater than "A" which means > "A" - // Or if the first column is a tie, then you look at the second column ex. ("A", blah). - // Therefore we also want all documents where the first column was a tie but the second column is greater which means = "A" AND > 1 - // Therefore the filters should be - // (> "A") OR (= "A" AND > 1), (> "A") OR (= "A" AND >= 1), (> "A") OR (= "A" AND >= 1) - // Notice that if we repeated the same logic we for single order by we would have gotten - // > "A" AND > 1, >= "A" AND >= 1, >= "A" AND >= 1 - // which is wrong since we missed some documents - // Repeat the same logic for ASC, DESC - // (> "A") OR (= "A" AND < 1), (> "A") OR (= "A" AND <= 1), (> "A") OR (= "A" AND <= 1) - // Again for DESC, ASC - // (< "A") OR (= "A" AND > 1), (< "A") OR (= "A" AND >= 1), (< "A") OR (= "A" AND >= 1) - // And again for DESC DESC - // (< "A") OR (= "A" AND < 1), (< "A") OR (= "A" AND <= 1), (< "A") OR (= "A" AND <= 1) - // The general we look at all prefixes of the order by columns to look for tie breakers. - // Except for the full prefix whose last column follows the rules for single item order by - // And then you just OR all the possibilities together - for (int prefixLength = 1; prefixLength <= numOrderByItems; prefixLength++) - { - ArraySegment expressionPrefix = new ArraySegment(expressions, 0, prefixLength); - ArraySegment sortOrderPrefix = new ArraySegment(sortOrders, 0, prefixLength); - ArraySegment orderByItemsPrefix = new ArraySegment(orderByItems, 0, prefixLength); - - bool lastPrefix = prefixLength == numOrderByItems; - bool firstPrefix = prefixLength == 1; - - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, "("); - - for (int index = 0; index < prefixLength; index++) - { - string expression = expressionPrefix.ElementAt(index); - SortOrder sortOrder = sortOrderPrefix.ElementAt(index); - CosmosElement orderByItem = orderByItemsPrefix.ElementAt(index); - bool lastItem = index == prefixLength - 1; - - // Append Expression - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, expression); - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " "); - - // Append binary operator - if (lastItem) - { - string inequality = sortOrder == SortOrder.Descending ? "<" : ">"; - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, inequality); - if (lastPrefix) - { - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, string.Empty, "=", "="); - } - } - else - { - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, "="); - } - - // Append SortOrder - StringBuilder sb = new StringBuilder(); - CosmosElementToQueryLiteral cosmosElementToQueryLiteral = new CosmosElementToQueryLiteral(sb); - orderByItem.Accept(cosmosElementToQueryLiteral); - string orderByItemToString = sb.ToString(); - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " "); - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, orderByItemToString); - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " "); - - if (!lastItem) - { - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, "AND "); - } - } - - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, ")"); - if (!lastPrefix) - { - CosmosOrderByItemQueryExecutionContext.AppendToBuilders(builders, " OR "); - } - } - } - - return new Tuple(left.ToString(), target.ToString(), right.ToString()); - } - - public override CosmosElement GetCosmosElementContinuationToken() - { - IEnumerable activeItemProducers = this.GetActiveItemProducers(); - if (!activeItemProducers.Any()) - { - return default; - } - - List orderByContinuationTokens = new List(); - foreach (ItemProducer activeItemProducer in activeItemProducers) - { - OrderByQueryResult orderByQueryResult = new OrderByQueryResult(activeItemProducer.Current); - OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( - compositeContinuationToken: new CompositeContinuationToken() - { - Token = activeItemProducer.PreviousContinuationToken, - Range = new Documents.Routing.Range( - min: activeItemProducer.PartitionKeyRange.MinInclusive, - max: activeItemProducer.PartitionKeyRange.MaxExclusive, - isMinInclusive: true, - isMaxInclusive: false) - }, - orderByItems: orderByQueryResult.OrderByItems, - rid: orderByQueryResult.Rid, - skipCount: this.ShouldIncrementSkipCount(activeItemProducer) ? this.skipCount + 1 : 0, - filter: activeItemProducer.Filter); - - CosmosElement cosmosElementToken = OrderByContinuationToken.ToCosmosElement(orderByContinuationToken); - orderByContinuationTokens.Add(cosmosElementToken); - } - - return CosmosArray.Create(orderByContinuationTokens); - } - - private readonly struct OrderByInitInfo - { - public OrderByInitInfo(RangeFilterInitializationInfo[] filters, IReadOnlyDictionary continuationTokens) - { - this.Filters = filters; - this.ContinuationTokens = continuationTokens; - } - - public RangeFilterInitializationInfo[] Filters { get; } - - public IReadOnlyDictionary ContinuationTokens { get; } - } - - /// - /// Struct to hold all the filters for every partition. - /// - private readonly struct FormattedFilterInfo - { - /// - /// Filters for current partition. - /// - public readonly string FiltersForTargetRange; - - /// - /// Filters for partitions left of the current partition. - /// - public readonly string FilterForRangesLeftOfTargetRanges; - - /// - /// Filters for partitions right of the current partition. - /// - public readonly string FilterForRangesRightOfTargetRanges; - - /// - /// Initializes a new instance of the FormattedFilterInfo struct. - /// - /// The filters for the partitions left of the current partition. - /// The filters for the current partition. - /// The filters for the partitions right of the current partition. - public FormattedFilterInfo(string leftFilter, string targetFilter, string rightFilters) - { - this.FilterForRangesLeftOfTargetRanges = leftFilter; - this.FiltersForTargetRange = targetFilter; - this.FilterForRangesRightOfTargetRanges = rightFilters; - } - } - - /// - /// Equality comparer used to determine if a document producer needs it's continuation token returned. - /// Basically just says that the continuation token can be flushed once you stop seeing duplicates. - /// - private sealed class OrderByEqualityComparer : IEqualityComparer - { - /// - /// The order by comparer. - /// - private readonly OrderByItemProducerTreeComparer orderByConsumeComparer; - - /// - /// Initializes a new instance of the OrderByEqualityComparer class. - /// - /// The order by consume comparer. - public OrderByEqualityComparer(OrderByItemProducerTreeComparer orderByConsumeComparer) - { - this.orderByConsumeComparer = orderByConsumeComparer ?? throw new ArgumentNullException($"{nameof(orderByConsumeComparer)} can not be null."); - } - - /// - /// Gets whether two OrderByQueryResult instances are equal. - /// - /// The first. - /// The second. - /// Whether two OrderByQueryResult instances are equal. - public bool Equals(CosmosElement x, CosmosElement y) - { - OrderByQueryResult orderByQueryResultX = new OrderByQueryResult(x); - OrderByQueryResult orderByQueryResultY = new OrderByQueryResult(y); - return this.orderByConsumeComparer.CompareOrderByItems( - orderByQueryResultX.OrderByItems, - orderByQueryResultY.OrderByItems) == 0; - } - - /// - /// Gets the hash code for object. - /// - /// The object to hash. - /// The hash code for the OrderByQueryResult object. - public int GetHashCode(CosmosElement obj) - { - return 0; - } - } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.ContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.ContinuationToken.cs new file mode 100644 index 0000000000..0380c994e9 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.ContinuationToken.cs @@ -0,0 +1,111 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.Parallel +{ + using System.Collections.Generic; + using System.Linq; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; + using Newtonsoft.Json; + + /// + /// CosmosParallelItemQueryExecutionContext is a concrete implementation for CrossPartitionQueryExecutionContext. + /// This class is responsible for draining cross partition queries that do not have order by conditions. + /// The way parallel queries work is that it drains from the left most partition first. + /// This class handles draining in the correct order and can also stop and resume the query + /// by generating a continuation token and resuming from said continuation token. + /// + internal sealed partial class CosmosParallelItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext + { + /// + /// For parallel queries the continuation token semantically holds two pieces of information: + /// 1) What physical partition did the user read up to + /// 2) How far into said partition did they read up to + /// And since the client consumes queries strictly in a left to right order we can partition the documents: + /// 1) Documents left of the continuation token have been drained + /// 2) Documents to the right of the continuation token still need to be served. + /// This is useful since we can have a single continuation token for all partitions. + /// + protected override string ContinuationToken + { + get + { + IEnumerable activeItemProducers = this.GetActiveItemProducers(); + string continuationToken; + if (activeItemProducers.Any()) + { + IEnumerable compositeContinuationTokens = activeItemProducers.Select((documentProducer) => new CompositeContinuationToken + { + Token = documentProducer.CurrentContinuationToken, + Range = documentProducer.PartitionKeyRange.ToRange() + }); + continuationToken = JsonConvert.SerializeObject(compositeContinuationTokens, DefaultJsonSerializationSettings.Value); + } + else + { + continuationToken = null; + } + + return continuationToken; + } + } + + public override CosmosElement GetCosmosElementContinuationToken() + { + IEnumerable activeItemProducers = this.GetActiveItemProducers(); + if (!activeItemProducers.Any()) + { + return default; + } + + List compositeContinuationTokens = new List(); + foreach (ItemProducer activeItemProducer in activeItemProducers) + { + CompositeContinuationToken compositeToken = new CompositeContinuationToken() + { + Token = activeItemProducer.CurrentContinuationToken, + Range = new Documents.Routing.Range( + min: activeItemProducer.PartitionKeyRange.MinInclusive, + max: activeItemProducer.PartitionKeyRange.MaxExclusive, + isMinInclusive: true, + isMaxInclusive: false) + }; + + CosmosElement compositeContinuationToken = CompositeContinuationToken.ToCosmosElement(compositeToken); + compositeContinuationTokens.Add(compositeContinuationToken); + } + + return CosmosArray.Create(compositeContinuationTokens); + } + + /// + /// Comparer used to determine if we should return the continuation token to the user + /// + /// This basically just says that the two object are never equals, so that we don't return a continuation for a partition we have started draining. + private sealed class ParallelEqualityComparer : IEqualityComparer + { + /// + /// Returns whether two parallel query items are equal. + /// + /// The first item. + /// The second item. + /// Whether two parallel query items are equal. + public bool Equals(CosmosElement x, CosmosElement y) + { + return x == y; + } + + /// + /// Gets the hash code of an object. + /// + /// The object to hash. + /// The hash code for the object. + public int GetHashCode(CosmosElement obj) + { + return obj == null ? 0 : obj.GetHashCode(); + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.Drain.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.Drain.cs new file mode 100644 index 0000000000..45845fa4d8 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.Drain.cs @@ -0,0 +1,72 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.Parallel +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + + /// + /// CosmosParallelItemQueryExecutionContext is a concrete implementation for CrossPartitionQueryExecutionContext. + /// This class is responsible for draining cross partition queries that do not have order by conditions. + /// The way parallel queries work is that it drains from the left most partition first. + /// This class handles draining in the correct order and can also stop and resume the query + /// by generating a continuation token and resuming from said continuation token. + /// + internal sealed partial class CosmosParallelItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext + { + public override async Task DrainAsync(int maxElements, CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + // In order to maintain the continuation token for the user we must drain with a few constraints + // 1) We fully drain from the left most partition before moving on to the next partition + // 2) We drain only full pages from the document producer so we aren't left with a partial page + // otherwise we would need to add to the continuation token how many items to skip over on that page. + + // Only drain from the leftmost (current) document producer tree + ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree(); + List results = new List(); + try + { + (bool gotNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken); + if (failureResponse != null) + { + return failureResponse.Value; + } + + if (gotNextPage) + { + int itemsLeftInCurrentPage = currentItemProducerTree.ItemsLeftInCurrentPage; + + // Only drain full pages or less if this is a top query. + currentItemProducerTree.TryMoveNextDocumentWithinPage(); + int numberOfItemsToDrain = Math.Min(itemsLeftInCurrentPage, maxElements); + for (int i = 0; i < numberOfItemsToDrain; i++) + { + results.Add(currentItemProducerTree.Current); + currentItemProducerTree.TryMoveNextDocumentWithinPage(); + } + } + } + finally + { + this.PushCurrentItemProducerTree(currentItemProducerTree); + } + + return QueryResponseCore.CreateSuccess( + result: results, + requestCharge: this.requestChargeTracker.GetAndResetCharge(), + activityId: null, + responseLengthBytes: this.GetAndResetResponseLengthBytes(), + disallowContinuationTokenMessage: null, + continuationToken: this.ContinuationToken, + diagnostics: this.GetAndResetDiagnostics()); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.Resume.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.Resume.cs new file mode 100644 index 0000000000..0ad350287f --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.Resume.cs @@ -0,0 +1,181 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.Parallel +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; + using Microsoft.Azure.Cosmos.Query.Core.Exceptions; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + using PartitionKeyRange = Documents.PartitionKeyRange; + + internal sealed partial class CosmosParallelItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext + { + public static async Task> TryCreateAsync( + CosmosQueryContext queryContext, + CosmosCrossPartitionQueryExecutionContext.CrossPartitionInitParams initParams, + CosmosElement requestContinuationToken, + CancellationToken cancellationToken) + { + if (queryContext == null) + { + throw new ArgumentNullException(nameof(queryContext)); + } + + cancellationToken.ThrowIfCancellationRequested(); + + IComparer moveNextComparer; + if (initParams.ReturnResultsInDeterministicOrder) + { + moveNextComparer = DeterministicParallelItemProducerTreeComparer.Singleton; + } + else + { + moveNextComparer = NonDeterministicParallelItemProducerTreeComparer.Singleton; + } + + CosmosParallelItemQueryExecutionContext context = new CosmosParallelItemQueryExecutionContext( + queryContext: queryContext, + maxConcurrency: initParams.MaxConcurrency, + maxItemCount: initParams.MaxItemCount, + maxBufferedItemCount: initParams.MaxBufferedItemCount, + moveNextComparer: moveNextComparer, + returnResultsInDeterministicOrder: initParams.ReturnResultsInDeterministicOrder, + testSettings: initParams.TestSettings); + + return (await context.TryInitializeAsync( + sqlQuerySpec: initParams.SqlQuerySpec, + collectionRid: initParams.CollectionRid, + partitionKeyRanges: initParams.PartitionKeyRanges, + initialPageSize: initParams.InitialPageSize, + requestContinuation: requestContinuationToken, + cancellationToken: cancellationToken)).Try(x => x); + } + + /// + /// Initialize the execution context. + /// + /// SQL query spec. + /// The collection rid. + /// The partition key ranges to drain documents from. + /// The initial page size. + /// The continuation token to resume from. + /// The cancellation token. + /// A task to await on. + private async Task> TryInitializeAsync( + SqlQuerySpec sqlQuerySpec, + string collectionRid, + IReadOnlyList partitionKeyRanges, + int initialPageSize, + CosmosElement requestContinuation, + CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + TryCatch> tryGetPartitionKeyRangeToCompositeContinuationToken = CosmosParallelItemQueryExecutionContext.TryGetPartitionKeyRangeToCompositeContinuationToken( + partitionKeyRanges, + requestContinuation); + if (!tryGetPartitionKeyRangeToCompositeContinuationToken.Succeeded) + { + return TryCatch.FromException(tryGetPartitionKeyRangeToCompositeContinuationToken.Exception); + } + + List> rangesToInitialize = new List>() + { + // Skip all the partitions left of the target range, since they have already been drained fully. + tryGetPartitionKeyRangeToCompositeContinuationToken.Result.TargetPartition, + tryGetPartitionKeyRangeToCompositeContinuationToken.Result.PartitionsRightOfTarget, + }; + + foreach (IReadOnlyDictionary rangeToCompositeToken in rangesToInitialize) + { + IReadOnlyDictionary partitionKeyRangeToContinuationToken = rangeToCompositeToken + .ToDictionary(kvp => kvp.Key, kvp => kvp.Value?.Token); + TryCatch tryInitialize = await base.TryInitializeAsync( + collectionRid, + initialPageSize, + sqlQuerySpec, + partitionKeyRangeToContinuationToken, + deferFirstPage: true, + filter: null, + tryFilterAsync: null, + cancellationToken); + if (!tryInitialize.Succeeded) + { + return TryCatch.FromException(tryInitialize.Exception); + } + } + + return TryCatch.FromResult(this); + } + + private static TryCatch> TryGetPartitionKeyRangeToCompositeContinuationToken( + IReadOnlyList partitionKeyRanges, + CosmosElement continuationToken) + { + if (continuationToken == null) + { + Dictionary dictionary = new Dictionary(); + foreach (PartitionKeyRange partitionKeyRange in partitionKeyRanges) + { + dictionary.Add(key: partitionKeyRange, value: null); + } + + return TryCatch>.FromResult( + new PartitionMapping( + partitionsLeftOfTarget: new Dictionary(), + targetPartition: dictionary, + partitionsRightOfTarget: new Dictionary())); + } + + TryCatch> tryParseCompositeContinuationTokens = TryParseCompositeContinuationList(continuationToken); + if (!tryParseCompositeContinuationTokens.Succeeded) + { + return TryCatch>.FromException(tryParseCompositeContinuationTokens.Exception); + } + + return CosmosCrossPartitionQueryExecutionContext.TryGetInitializationInfo( + partitionKeyRanges, + tryParseCompositeContinuationTokens.Result); + } + + private static TryCatch> TryParseCompositeContinuationList( + CosmosElement requestContinuationToken) + { + if (requestContinuationToken == null) + { + throw new ArgumentNullException(nameof(requestContinuationToken)); + } + + if (!(requestContinuationToken is CosmosArray compositeContinuationTokenListRaw)) + { + return TryCatch>.FromException( + new MalformedContinuationTokenException( + $"Invalid format for continuation token {requestContinuationToken} for {nameof(CosmosParallelItemQueryExecutionContext)}")); + } + + List compositeContinuationTokens = new List(); + foreach (CosmosElement compositeContinuationTokenRaw in compositeContinuationTokenListRaw) + { + TryCatch tryCreateCompositeContinuationToken = CompositeContinuationToken.TryCreateFromCosmosElement(compositeContinuationTokenRaw); + if (!tryCreateCompositeContinuationToken.Succeeded) + { + return TryCatch>.FromException(tryCreateCompositeContinuationToken.Exception); + } + + compositeContinuationTokens.Add(tryCreateCompositeContinuationToken.Result); + } + + return TryCatch>.FromResult(compositeContinuationTokens); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.cs index a3d3553b83..56a2f6225e 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/Parallel/CosmosParallelItemQueryExecutionContext.cs @@ -5,22 +5,9 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.Parallel { using System; using System.Collections.Generic; - using System.Diagnostics; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Json; - using Microsoft.Azure.Cosmos.Query.Core; - using Microsoft.Azure.Cosmos.Query.Core.Collections; - using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; - using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.QueryClient; - using Newtonsoft.Json; - using PartitionKeyRange = Documents.PartitionKeyRange; /// /// CosmosParallelItemQueryExecutionContext is a concrete implementation for CrossPartitionQueryExecutionContext. @@ -29,20 +16,20 @@ namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.Parallel /// This class handles draining in the correct order and can also stop and resume the query /// by generating a continuation token and resuming from said continuation token. /// - internal sealed class CosmosParallelItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext + internal sealed partial class CosmosParallelItemQueryExecutionContext : CosmosCrossPartitionQueryExecutionContext { /// - /// The function to determine which partition to fetch from first. + /// The comparer used to determine, which continuation tokens should be returned to the user. /// - private static readonly Func FetchPriorityFunction = documentProducerTree => int.Parse(documentProducerTree.PartitionKeyRange.Id); + private static readonly IEqualityComparer EqualityComparer = new ParallelEqualityComparer(); /// - /// The comparer used to determine, which continuation tokens should be returned to the user. + /// The function to determine which partition to fetch from first. /// - private static readonly IEqualityComparer EqualityComparer = new ParallelEqualityComparer(); + private static readonly Func FetchPriorityFunction = documentProducerTree => int.Parse(documentProducerTree.PartitionKeyRange.Id); private readonly bool returnResultsInDeterministicOrder; - + /// /// Initializes a new instance of the CosmosParallelItemQueryExecutionContext class. /// @@ -74,303 +61,5 @@ private CosmosParallelItemQueryExecutionContext( { this.returnResultsInDeterministicOrder = returnResultsInDeterministicOrder; } - - /// - /// For parallel queries the continuation token semantically holds two pieces of information: - /// 1) What physical partition did the user read up to - /// 2) How far into said partition did they read up to - /// And since the client consumes queries strictly in a left to right order we can partition the documents: - /// 1) Documents left of the continuation token have been drained - /// 2) Documents to the right of the continuation token still need to be served. - /// This is useful since we can have a single continuation token for all partitions. - /// - protected override string ContinuationToken - { - get - { - IEnumerable activeItemProducers = this.GetActiveItemProducers(); - string continuationToken; - if (activeItemProducers.Any()) - { - IEnumerable compositeContinuationTokens = activeItemProducers.Select((documentProducer) => new CompositeContinuationToken - { - Token = documentProducer.CurrentContinuationToken, - Range = documentProducer.PartitionKeyRange.ToRange() - }); - continuationToken = JsonConvert.SerializeObject(compositeContinuationTokens, DefaultJsonSerializationSettings.Value); - } - else - { - continuationToken = null; - } - - return continuationToken; - } - } - - public static async Task> TryCreateAsync( - CosmosQueryContext queryContext, - CosmosCrossPartitionQueryExecutionContext.CrossPartitionInitParams initParams, - CosmosElement requestContinuationToken, - CancellationToken cancellationToken) - { - Debug.Assert( - !initParams.PartitionedQueryExecutionInfo.QueryInfo.HasOrderBy, - "Parallel~Context must not have order by query info."); - - cancellationToken.ThrowIfCancellationRequested(); - - IComparer moveNextComparer; - if (initParams.ReturnResultsInDeterministicOrder) - { - moveNextComparer = DeterministicParallelItemProducerTreeComparer.Singleton; - } - else - { - moveNextComparer = NonDeterministicParallelItemProducerTreeComparer.Singleton; - } - - CosmosParallelItemQueryExecutionContext context = new CosmosParallelItemQueryExecutionContext( - queryContext: queryContext, - maxConcurrency: initParams.MaxConcurrency, - maxItemCount: initParams.MaxItemCount, - maxBufferedItemCount: initParams.MaxBufferedItemCount, - moveNextComparer: moveNextComparer, - returnResultsInDeterministicOrder: initParams.ReturnResultsInDeterministicOrder, - testSettings: initParams.TestSettings); - - return (await context.TryInitializeAsync( - sqlQuerySpec: initParams.SqlQuerySpec, - collectionRid: initParams.CollectionRid, - partitionKeyRanges: initParams.PartitionKeyRanges, - initialPageSize: initParams.InitialPageSize, - requestContinuation: requestContinuationToken, - cancellationToken: cancellationToken)).Try(x => x); - } - - public override async Task DrainAsync(int maxElements, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - // In order to maintain the continuation token for the user we must drain with a few constraints - // 1) We fully drain from the left most partition before moving on to the next partition - // 2) We drain only full pages from the document producer so we aren't left with a partial page - // otherwise we would need to add to the continuation token how many items to skip over on that page. - - // Only drain from the leftmost (current) document producer tree - ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree(); - List results = new List(); - try - { - (bool gotNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken); - if (failureResponse != null) - { - return failureResponse.Value; - } - - if (gotNextPage) - { - int itemsLeftInCurrentPage = currentItemProducerTree.ItemsLeftInCurrentPage; - - // Only drain full pages or less if this is a top query. - currentItemProducerTree.TryMoveNextDocumentWithinPage(); - int numberOfItemsToDrain = Math.Min(itemsLeftInCurrentPage, maxElements); - for (int i = 0; i < numberOfItemsToDrain; i++) - { - results.Add(currentItemProducerTree.Current); - currentItemProducerTree.TryMoveNextDocumentWithinPage(); - } - } - } - finally - { - this.PushCurrentItemProducerTree(currentItemProducerTree); - } - - return QueryResponseCore.CreateSuccess( - result: results, - requestCharge: this.requestChargeTracker.GetAndResetCharge(), - activityId: null, - responseLengthBytes: this.GetAndResetResponseLengthBytes(), - disallowContinuationTokenMessage: null, - continuationToken: this.ContinuationToken, - diagnostics: this.GetAndResetDiagnostics()); - } - - /// - /// Initialize the execution context. - /// - /// SQL query spec. - /// The collection rid. - /// The partition key ranges to drain documents from. - /// The initial page size. - /// The continuation token to resume from. - /// The cancellation token. - /// A task to await on. - private async Task> TryInitializeAsync( - SqlQuerySpec sqlQuerySpec, - string collectionRid, - List partitionKeyRanges, - int initialPageSize, - CosmosElement requestContinuation, - CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - TryCatch tryGetInitInfo = TryGetInitializationInfoFromContinuationToken( - partitionKeyRanges, - requestContinuation); - if (!tryGetInitInfo.Succeeded) - { - return TryCatch.FromException(tryGetInitInfo.Exception); - } - - ParallelInitInfo initializationInfo = tryGetInitInfo.Result; - IReadOnlyList filteredPartitionKeyRanges = initializationInfo.PartialRanges; - IReadOnlyDictionary targetIndicesForFullContinuation = initializationInfo.ContinuationTokens; - TryCatch tryInitialize = await base.TryInitializeAsync( - collectionRid, - filteredPartitionKeyRanges, - initialPageSize, - sqlQuerySpec, - targetIndicesForFullContinuation?.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.Token), - true, - null, - null, - cancellationToken); - if (!tryInitialize.Succeeded) - { - return TryCatch.FromException(tryInitialize.Exception); - } - - return TryCatch.FromResult(this); - } - - /// - /// Given a continuation token and a list of partitionKeyRanges this function will return a list of partition key ranges you should resume with. - /// Note that the output list is just a right hand slice of the input list, since we know that for any continuation of a parallel query it is just - /// resuming from the partition that the query left off that. - /// - /// The partition key ranges. - /// The continuation tokens that the user has supplied. - /// The subset of partition to actually target and continuation tokens. - private static TryCatch TryGetInitializationInfoFromContinuationToken( - List partitionKeyRanges, - CosmosElement continuationToken) - { - if (continuationToken == null) - { - return TryCatch.FromResult( - new ParallelInitInfo( - partitionKeyRanges, - null)); - } - - if (!(continuationToken is CosmosArray compositeContinuationTokenListRaw)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException($"Invalid format for continuation token {continuationToken} for {nameof(CosmosParallelItemQueryExecutionContext)}")); - } - - List compositeContinuationTokens = new List(); - foreach (CosmosElement compositeContinuationTokenRaw in compositeContinuationTokenListRaw) - { - TryCatch tryCreateCompositeContinuationToken = CompositeContinuationToken.TryCreateFromCosmosElement(compositeContinuationTokenRaw); - if (!tryCreateCompositeContinuationToken.Succeeded) - { - return TryCatch.FromException(tryCreateCompositeContinuationToken.Exception); - } - - compositeContinuationTokens.Add(tryCreateCompositeContinuationToken.Result); - } - - return CosmosCrossPartitionQueryExecutionContext.TryFindTargetRangeAndExtractContinuationTokens( - partitionKeyRanges, - compositeContinuationTokens.Select(token => Tuple.Create(token, token.Range))) - .Try((indexAndTokens) => - { - int minIndex = indexAndTokens.TargetIndex; - IReadOnlyDictionary rangeToToken = indexAndTokens.ContinuationTokens; - - // We know that all partitions to the left of the continuation token are fully drained so we can filter them out - IReadOnlyList filteredRanges = new PartialReadOnlyList( - partitionKeyRanges, - minIndex, - partitionKeyRanges.Count - minIndex); - - return new ParallelInitInfo( - filteredRanges, - rangeToToken); - }); - } - - public override CosmosElement GetCosmosElementContinuationToken() - { - IEnumerable activeItemProducers = this.GetActiveItemProducers(); - if (!activeItemProducers.Any()) - { - return default; - } - - List compositeContinuationTokens = new List(); - foreach (ItemProducer activeItemProducer in activeItemProducers) - { - CompositeContinuationToken compositeToken = new CompositeContinuationToken() - { - Token = activeItemProducer.CurrentContinuationToken, - Range = new Documents.Routing.Range( - min: activeItemProducer.PartitionKeyRange.MinInclusive, - max: activeItemProducer.PartitionKeyRange.MaxExclusive, - isMinInclusive: false, - isMaxInclusive: true) - }; - - CosmosElement compositeContinuationToken = CompositeContinuationToken.ToCosmosElement(compositeToken); - compositeContinuationTokens.Add(compositeContinuationToken); - } - - return CosmosArray.Create(compositeContinuationTokens); - } - - private readonly struct ParallelInitInfo - { - public ParallelInitInfo(IReadOnlyList partialRanges, IReadOnlyDictionary continuationTokens) - { - this.PartialRanges = partialRanges; - this.ContinuationTokens = continuationTokens; - } - - public IReadOnlyList PartialRanges { get; } - - public IReadOnlyDictionary ContinuationTokens { get; } - } - - /// - /// Comparer used to determine if we should return the continuation token to the user - /// - /// This basically just says that the two object are never equals, so that we don't return a continuation for a partition we have started draining. - private sealed class ParallelEqualityComparer : IEqualityComparer - { - /// - /// Returns whether two parallel query items are equal. - /// - /// The first item. - /// The second item. - /// Whether two parallel query items are equal. - public bool Equals(CosmosElement x, CosmosElement y) - { - return x == y; - } - - /// - /// Gets the hash code of an object. - /// - /// The object to hash. - /// The hash code for the object. - public int GetHashCode(CosmosElement obj) - { - return obj.GetHashCode(); - } - } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/ContinuationResumeLogicTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/ContinuationResumeLogicTests.cs new file mode 100644 index 0000000000..2bba535810 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/ContinuationResumeLogicTests.cs @@ -0,0 +1,352 @@ +namespace Microsoft.Azure.Cosmos.Query +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; + using static Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.CosmosCrossPartitionQueryExecutionContext; + + [TestClass] + public class ContinuationResumeLogicTests + { + [TestMethod] + public void TestMatchRangesTocontinuationTokens_OneToOne() + { + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + { + MinInclusive = string.Empty, + MaxExclusive = "FF", + Id = "0" + }; + + CompositeContinuationToken token = new CompositeContinuationToken() + { + Range = new Documents.Routing.Range( + min: string.Empty, + max: "FF", + isMinInclusive: true, + isMaxInclusive: false), + Token = "asdf" + }; + + IReadOnlyDictionary expectedMapping = new Dictionary() + { + { partitionKeyRange, token } + }; + + ContinuationResumeLogicTests.RunMatchRangesToContinuationTokens( + expectedMapping, + new PartitionKeyRange[] { partitionKeyRange }, + new CompositeContinuationToken[] { token }); + } + + [TestMethod] + public void TestMatchRangesTocontinuationTokens_OneToMany() + { + PartitionKeyRange partitionKeyRange1 = new PartitionKeyRange() + { + MinInclusive = string.Empty, + MaxExclusive = "A", + Id = "1" + }; + + PartitionKeyRange partitionKeyRange2 = new PartitionKeyRange() + { + MinInclusive = "A", + MaxExclusive = "B", + Id = "1" + }; + + CompositeContinuationToken token = new CompositeContinuationToken() + { + Range = new Documents.Routing.Range( + min: string.Empty, + max: "B", + isMinInclusive: true, + isMaxInclusive: false), + Token = "asdf" + }; + + IReadOnlyDictionary expectedMapping = new Dictionary() + { + { partitionKeyRange1, token }, + { partitionKeyRange2, token } + }; + + ContinuationResumeLogicTests.RunMatchRangesToContinuationTokens( + expectedMapping, + new PartitionKeyRange[] { partitionKeyRange1, partitionKeyRange2 }, + new CompositeContinuationToken[] { token }); + } + + [TestMethod] + public void TestMatchRangesTocontinuationTokens_OneToNone() + { + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + { + MinInclusive = string.Empty, + MaxExclusive = "A", + Id = "1" + }; + + CompositeContinuationToken token = new CompositeContinuationToken() + { + Range = new Documents.Routing.Range( + min: "B", + max: "C", + isMinInclusive: true, + isMaxInclusive: false), + Token = "asdf" + }; + + IReadOnlyDictionary expectedMapping = new Dictionary() + { + { partitionKeyRange, null }, + }; + + ContinuationResumeLogicTests.RunMatchRangesToContinuationTokens( + expectedMapping, + new PartitionKeyRange[] { partitionKeyRange }, + new CompositeContinuationToken[] { token }); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentNullException))] + public void TestMatchRangesTocontinuationTokens_ArgumentNullException() + { + ContinuationResumeLogicTests.RunMatchRangesToContinuationTokens( + expectedMapping: null, + partitionKeyRanges: new PartitionKeyRange[] { }, + partitionedTokens: null); + } + + [TestMethod] + public void TestTryGetInitializationInfo_ResumeLeftMostPartition() + { + PartitionKeyRange pkRange1 = new PartitionKeyRange() + { + MinInclusive = string.Empty, + MaxExclusive = "A", + Id = "1" + }; + + PartitionKeyRange pkRange2 = new PartitionKeyRange() + { + MinInclusive = "A", + MaxExclusive = "B", + Id = "2" + }; + + PartitionKeyRange pkRange3 = new PartitionKeyRange() + { + MinInclusive = "B", + MaxExclusive = "C", + Id = "3" + }; + + CompositeContinuationToken token = new CompositeContinuationToken() + { + Range = new Documents.Routing.Range( + min: string.Empty, + max: "A", + isMinInclusive: true, + isMaxInclusive: false), + Token = "asdf" + }; + + IReadOnlyDictionary expectedMappingLeftPartitions = new Dictionary() + { + }; + + IReadOnlyDictionary expectedMappingTargetPartition = new Dictionary() + { + { pkRange1, token} + }; + + IReadOnlyDictionary expectedMappingRightPartitions = new Dictionary() + { + { pkRange2, null}, + { pkRange3, null}, + }; + + RunTryGetInitializationInfo( + expectedMappingLeftPartitions, + expectedMappingTargetPartition, + expectedMappingRightPartitions, + new PartitionKeyRange[] { pkRange1, pkRange2, pkRange3 }, + new IPartitionedToken[] { token }); + } + + [TestMethod] + public void TestTryGetInitializationInfo_ResumeMiddlePartition() + { + PartitionKeyRange pkRange1 = new PartitionKeyRange() + { + MinInclusive = string.Empty, + MaxExclusive = "A", + Id = "1" + }; + + PartitionKeyRange pkRange2 = new PartitionKeyRange() + { + MinInclusive = "A", + MaxExclusive = "B", + Id = "2" + }; + + PartitionKeyRange pkRange3 = new PartitionKeyRange() + { + MinInclusive = "B", + MaxExclusive = "C", + Id = "3" + }; + + CompositeContinuationToken token = new CompositeContinuationToken() + { + Range = new Documents.Routing.Range( + min: "A", + max: "B", + isMinInclusive: true, + isMaxInclusive: false), + Token = "asdf" + }; + + IReadOnlyDictionary expectedMappingLeftPartitions = new Dictionary() + { + { pkRange1, null} + }; + + IReadOnlyDictionary expectedMappingTargetPartition = new Dictionary() + { + { pkRange2, token}, + }; + + IReadOnlyDictionary expectedMappingRightPartitions = new Dictionary() + { + { pkRange3, null}, + }; + + RunTryGetInitializationInfo( + expectedMappingLeftPartitions, + expectedMappingTargetPartition, + expectedMappingRightPartitions, + new PartitionKeyRange[] { pkRange1, pkRange2, pkRange3 }, + new IPartitionedToken[] { token }); + } + + [TestMethod] + public void TestTryGetInitializationInfo_ResumeRightPartition() + { + PartitionKeyRange pkRange1 = new PartitionKeyRange() + { + MinInclusive = string.Empty, + MaxExclusive = "A", + Id = "1" + }; + + PartitionKeyRange pkRange2 = new PartitionKeyRange() + { + MinInclusive = "A", + MaxExclusive = "B", + Id = "2" + }; + + PartitionKeyRange pkRange3 = new PartitionKeyRange() + { + MinInclusive = "B", + MaxExclusive = "C", + Id = "3" + }; + + CompositeContinuationToken token = new CompositeContinuationToken() + { + Range = new Documents.Routing.Range( + min: "B", + max: "C", + isMinInclusive: true, + isMaxInclusive: false), + Token = "asdf" + }; + + IReadOnlyDictionary expectedMappingLeftPartitions = new Dictionary() + { + { pkRange1, null}, + { pkRange2, null}, + }; + + IReadOnlyDictionary expectedMappingTargetPartition = new Dictionary() + { + { pkRange3, token}, + }; + + IReadOnlyDictionary expectedMappingRightPartitions = new Dictionary() + { + }; + + RunTryGetInitializationInfo( + expectedMappingLeftPartitions, + expectedMappingTargetPartition, + expectedMappingRightPartitions, + new PartitionKeyRange[] { pkRange1, pkRange2, pkRange3 }, + new IPartitionedToken[] { token }); + } + + private static void RunMatchRangesToContinuationTokens( + IReadOnlyDictionary expectedMapping, + IEnumerable partitionKeyRanges, + IEnumerable partitionedTokens) + { + IReadOnlyDictionary actualMapping = CosmosCrossPartitionQueryExecutionContext.MatchRangesToContinuationTokens( + partitionKeyRanges.OrderBy(x => Guid.NewGuid()).ToArray(), + partitionedTokens.OrderBy(x => Guid.NewGuid()).ToList()); + + ContinuationResumeLogicTests.AssertPartitionMappingAreEqual( + expectedMapping, + actualMapping); + } + + private static void RunTryGetInitializationInfo( + IReadOnlyDictionary expectedMappingLeftPartitions, + IReadOnlyDictionary expectedMappingTargetPartition, + IReadOnlyDictionary expectedMappingRightPartitions, + IEnumerable partitionKeyRanges, + IEnumerable partitionedTokens) + { + TryCatch> tryGetInitializationInfo = CosmosCrossPartitionQueryExecutionContext.TryGetInitializationInfo( + partitionKeyRanges.OrderBy(x => Guid.NewGuid()).ToArray(), + partitionedTokens.OrderBy(x => Guid.NewGuid()).ToList()); + Assert.IsTrue(tryGetInitializationInfo.Succeeded); + PartitionMapping partitionMapping = tryGetInitializationInfo.Result; + + AssertPartitionMappingAreEqual(expectedMappingLeftPartitions, partitionMapping.PartitionsLeftOfTarget); + AssertPartitionMappingAreEqual(expectedMappingTargetPartition, partitionMapping.TargetPartition); + AssertPartitionMappingAreEqual(expectedMappingRightPartitions, partitionMapping.PartitionsRightOfTarget); + } + + private static void AssertPartitionMappingAreEqual( + IReadOnlyDictionary expectedMapping, + IReadOnlyDictionary actualMapping) + { + Assert.IsNotNull(expectedMapping); + Assert.IsNotNull(actualMapping); + + Assert.AreEqual(expected: expectedMapping.Count, actual: actualMapping.Count); + + foreach (KeyValuePair kvp in expectedMapping) + { + Assert.IsTrue( + actualMapping.TryGetValue( + kvp.Key, + out IPartitionedToken partitionedToken)); + Assert.AreEqual( + expected: JsonConvert.SerializeObject(kvp.Value), + actual: JsonConvert.SerializeObject(partitionedToken)); + } + } + } +}