From f277ce3428798ec5d90e1a6401f605e4e0cdb957 Mon Sep 17 00:00:00 2001 From: Aditya Date: Thu, 18 Jul 2024 08:34:48 -0700 Subject: [PATCH] [Internal] Query: Fixes ORDER BY issue when partial partition key is specified in RequestOptions in a query to sub-partitioned container (#4587) * Initial commit. * Addressed comments. * Addressed remaining comments. --------- Co-authored-by: neildsh <35383880+neildsh@users.noreply.github.com> --- .../HierarchicalPartitionUtils.cs | 100 ++++++++++++++++++ ...OrderByCrossPartitionQueryPipelineStage.cs | 41 ++++++- ...yQueryPartitionRangePageAsyncEnumerator.cs | 21 ++-- .../QueryPartitionRangePageAsyncEnumerator.cs | 92 +--------------- .../Query/Core/Pipeline/PipelineFactory.cs | 3 +- .../Query/OrderByPipelineStageBenchmark.cs | 3 +- .../Pagination/InMemoryContainer.cs | 22 +--- ...yPartitionRangePageAsyncEnumeratorTests.cs | 3 +- .../Pipeline/NonStreamingOrderByQueryTests.cs | 3 +- ...ByCrossPartitionQueryPipelineStageTests.cs | 39 ++++--- .../Query/QueryPlanBaselineTests.cs | 2 +- 11 files changed, 188 insertions(+), 141 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs new file mode 100644 index 0000000000..2b91c7cb40 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/HierarchicalPartitionUtils.cs @@ -0,0 +1,100 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition +{ + using System; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + + internal static class HierarchicalPartitionUtils + { + /// + /// Updates the FeedRange to limit the scope of incoming feedRange to logical partition within a single physical partition. + /// Generally speaking, a subpartitioned container can experience split partition at any level of hierarchical partition key. + /// This could cause a situation where more than one physical partition contains the data for a partial partition key. + /// Currently, enumerator instantiation does not honor physical partition boundary and allocates entire epk range which could spans across multiple physical partitions to the enumerator. + /// Since such an epk range does not exist at the container level, Service generates a GoneException. + /// This method restrics the range of each enumerator by intersecting it with physical partition range. + /// + public static FeedRangeInternal LimitFeedRangeToSinglePartition(PartitionKey? partitionKey, FeedRangeInternal feedRange, ContainerQueryProperties containerQueryProperties) + { + // We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token. + // In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state). + if (partitionKey.HasValue) + { + // ISSUE-HACK-adityasa-3/25/2024 - We should not update the original feed range inside this class. + // Instead we should guarantee that when enumerator is instantiated it is limited to a single physical partition. + // Ultimately we should remove enumerator's dependency on PartitionKey. + if ((containerQueryProperties.PartitionKeyDefinition.Paths.Count > 1) && + (partitionKey.Value.InternalKey.Components.Count != containerQueryProperties.PartitionKeyDefinition.Paths.Count) && + (feedRange is FeedRangeEpk feedRangeEpk)) + { + if (containerQueryProperties.EffectiveRangesForPartitionKey == null || + containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0) + { + throw new InvalidOperationException( + "EffectiveRangesForPartitionKey should be populated when PK is specified in request options."); + } + + foreach (Documents.Routing.Range epkForPartitionKey in containerQueryProperties.EffectiveRangesForPartitionKey) + { + if (Documents.Routing.Range.CheckOverlapping( + feedRangeEpk.Range, + epkForPartitionKey)) + { + if (!feedRangeEpk.Range.Equals(epkForPartitionKey)) + { + String overlappingMin; + bool minInclusive; + String overlappingMax; + bool maxInclusive; + + if (Documents.Routing.Range.MinComparer.Instance.Compare( + epkForPartitionKey, + feedRangeEpk.Range) < 0) + { + overlappingMin = feedRangeEpk.Range.Min; + minInclusive = feedRangeEpk.Range.IsMinInclusive; + } + else + { + overlappingMin = epkForPartitionKey.Min; + minInclusive = epkForPartitionKey.IsMinInclusive; + } + + if (Documents.Routing.Range.MaxComparer.Instance.Compare( + epkForPartitionKey, + feedRangeEpk.Range) > 0) + { + overlappingMax = feedRangeEpk.Range.Max; + maxInclusive = feedRangeEpk.Range.IsMaxInclusive; + } + else + { + overlappingMax = epkForPartitionKey.Max; + maxInclusive = epkForPartitionKey.IsMaxInclusive; + } + + feedRange = new FeedRangeEpk( + new Documents.Routing.Range( + overlappingMin, + overlappingMax, + minInclusive, + maxInclusive)); + } + + break; + } + } + } + else + { + feedRange = new FeedRangePartitionKey(partitionKey.Value); + } + } + + return feedRange; + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs index c96942f080..e7121d5770 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs @@ -43,6 +43,8 @@ private sealed class InitializationParameters { public IDocumentContainer DocumentContainer { get; } + public ContainerQueryProperties ContainerQueryProperties { get; } + public SqlQuerySpec SqlQuerySpec { get; } public IReadOnlyList TargetRanges { get; } @@ -52,11 +54,12 @@ private sealed class InitializationParameters public IReadOnlyList OrderByColumns { get; } public QueryExecutionOptions QueryPaginationOptions { get; } - + public int MaxConcurrency { get; } public InitializationParameters( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, IReadOnlyList targetRanges, PartitionKey? partitionKey, @@ -65,6 +68,7 @@ public InitializationParameters( int maxConcurrency) { this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); + this.ContainerQueryProperties = containerQueryProperties; this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec)); this.TargetRanges = targetRanges ?? throw new ArgumentNullException(nameof(targetRanges)); this.PartitionKey = partitionKey; @@ -83,6 +87,7 @@ private enum ExecutionState public static TryCatch MonadicCreate( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, IReadOnlyList targetRanges, Cosmos.PartitionKey? partitionKey, @@ -126,6 +131,7 @@ public static TryCatch MonadicCreate( { return StreamingOrderByCrossPartitionQueryPipelineStage.MonadicCreate( documentContainer, + containerQueryProperties, sqlQuerySpec, targetRanges, partitionKey, @@ -141,6 +147,7 @@ public static TryCatch MonadicCreate( return TryCatch.FromResult(NonStreamingOrderByPipelineStage.Create( documentContainer, + containerQueryProperties, rewrittenQueryForOrderBy, targetRanges, partitionKey, @@ -151,6 +158,7 @@ public static TryCatch MonadicCreate( private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens, OrderByQueryPartitionRangePageAsyncEnumerator uninitializedEnumerator, OrderByContinuationToken token, @@ -193,6 +201,7 @@ private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( // We maintain the current enumerator's range and let the RequestInvokerHandler logic kick in OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, uninitializedEnumerator.SqlQuerySpec, new FeedRangeState(uninitializedEnumerator.FeedRangeState.FeedRange, uninitializedEnumerator.StartOfPageState), partitionKey: null, @@ -210,6 +219,7 @@ private static async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, uninitializedEnumerator.SqlQuerySpec, new FeedRangeState(childRange, uninitializedEnumerator.StartOfPageState), partitionKey: null, @@ -293,6 +303,7 @@ private static bool ContainsSupportedResumeTypes(IReadOnlyList orde private sealed class StreamingOrderByCrossPartitionQueryPipelineStage : IQueryPipelineStage { private readonly IDocumentContainer documentContainer; + private readonly ContainerQueryProperties containerQueryProperties; private readonly IReadOnlyList sortOrders; private readonly PriorityQueue enumerators; private readonly Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens; @@ -315,6 +326,7 @@ private static class Expressions private StreamingOrderByCrossPartitionQueryPipelineStage( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, IReadOnlyList sortOrders, QueryExecutionOptions queryPaginationOptions, int maxConcurrency, @@ -322,6 +334,7 @@ private StreamingOrderByCrossPartitionQueryPipelineStage( QueryState state) { this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); + this.containerQueryProperties = containerQueryProperties; this.sortOrders = sortOrders ?? throw new ArgumentNullException(nameof(sortOrders)); this.enumerators = new PriorityQueue(new OrderByEnumeratorComparer(this.sortOrders)); this.queryPaginationOptions = queryPaginationOptions ?? QueryExecutionOptions.Default; @@ -332,6 +345,7 @@ private StreamingOrderByCrossPartitionQueryPipelineStage( private StreamingOrderByCrossPartitionQueryPipelineStage( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, IReadOnlyList sortOrders, PriorityQueue enumerators, Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens, @@ -339,6 +353,7 @@ private StreamingOrderByCrossPartitionQueryPipelineStage( int maxConcurrency) { this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); + this.containerQueryProperties = containerQueryProperties; this.sortOrders = sortOrders ?? throw new ArgumentNullException(nameof(sortOrders)); this.enumerators = enumerators ?? throw new ArgumentNullException(nameof(enumerators)); this.uninitializedEnumeratorsAndTokens = uninitializedEnumeratorsAndTokens ?? throw new ArgumentNullException(nameof(uninitializedEnumeratorsAndTokens)); @@ -562,6 +577,7 @@ private async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( { await OrderByCrossPartitionQueryPipelineStage.MoveNextAsync_InitializeAsync_HandleSplitAsync( this.documentContainer, + this.containerQueryProperties, this.uninitializedEnumeratorsAndTokens, uninitializedEnumerator, token, @@ -775,6 +791,7 @@ public ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellatio public static IQueryPipelineStage Create( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, IReadOnlyList sortOrders, PriorityQueue enumerators, Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> uninitializedEnumeratorsAndTokens, @@ -783,6 +800,7 @@ public static IQueryPipelineStage Create( { return new StreamingOrderByCrossPartitionQueryPipelineStage( documentContainer, + containerQueryProperties, sortOrders, enumerators, uninitializedEnumeratorsAndTokens, @@ -792,6 +810,7 @@ public static IQueryPipelineStage Create( public static TryCatch MonadicCreate( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, IReadOnlyList targetRanges, Cosmos.PartitionKey? partitionKey, @@ -815,6 +834,7 @@ public static TryCatch MonadicCreate( enumeratorsAndTokens = targetRanges .Select(range => (OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, rewrittenQueryForOrderBy, new FeedRangeState(range, state: default), partitionKey, @@ -903,6 +923,7 @@ public static TryCatch MonadicCreate( OrderByContinuationToken token = kvp.Value; OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, leftQuerySpec, new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null), partitionKey, @@ -930,6 +951,7 @@ public static TryCatch MonadicCreate( OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, targetQuerySpec, new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null), partitionKey, @@ -952,6 +974,7 @@ public static TryCatch MonadicCreate( OrderByContinuationToken token = kvp.Value; OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, rightQuerySpec, new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null), partitionKey, @@ -991,6 +1014,7 @@ public static TryCatch MonadicCreate( OrderByContinuationToken token = kvp.Value; OrderByQueryPartitionRangePageAsyncEnumerator remoteEnumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, rewrittenQueryForOrderBy, new FeedRangeState(range, token?.ParallelContinuationToken?.Token != null ? new QueryState(CosmosString.Create(token.ParallelContinuationToken.Token)) : null), partitionKey, @@ -1006,6 +1030,7 @@ public static TryCatch MonadicCreate( StreamingOrderByCrossPartitionQueryPipelineStage stage = new StreamingOrderByCrossPartitionQueryPipelineStage( documentContainer, + containerQueryProperties, orderByColumns.Select(column => column.SortOrder).ToList(), queryPaginationOptions, maxConcurrency, @@ -1740,6 +1765,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace { ITracingAsyncEnumerator> enumerator = await OrderByCrossPartitionRangePageEnumerator.CreateAsync( this.parameters.DocumentContainer, + this.parameters.ContainerQueryProperties, this.parameters.SqlQuerySpec, this.parameters.TargetRanges, this.parameters.PartitionKey, @@ -1763,6 +1789,7 @@ private async Task MoveNextAsync_InitializeAsync(ITrace public static IQueryPipelineStage Create( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, IReadOnlyList targetRanges, Cosmos.PartitionKey? partitionKey, @@ -1776,6 +1803,7 @@ public static IQueryPipelineStage Create( InitializationParameters parameters = new InitializationParameters( documentContainer, + containerQueryProperties, sqlQuerySpec, targetRanges, partitionKey, @@ -1793,20 +1821,25 @@ private sealed class OrderByCrossPartitionRangePageEnumerator : ITracingAsyncEnu { private readonly IDocumentContainer documentContainer; + private readonly ContainerQueryProperties containerQueryProperties; + private readonly Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens; public TryCatch Current { get; private set; } private OrderByCrossPartitionRangePageEnumerator( IDocumentContainer documentContainer, - Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens) + Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens, + ContainerQueryProperties containerQueryProperties) { this.documentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer)); this.enumeratorsAndTokens = enumeratorsAndTokens ?? throw new ArgumentNullException(nameof(enumeratorsAndTokens)); + this.containerQueryProperties = containerQueryProperties; } public static async Task>> CreateAsync( IDocumentContainer documentContainer, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, IReadOnlyList targetRanges, Cosmos.PartitionKey? partitionKey, @@ -1821,6 +1854,7 @@ public static async Task>> Cr { OrderByQueryPartitionRangePageAsyncEnumerator enumerator = OrderByQueryPartitionRangePageAsyncEnumerator.Create( documentContainer, + containerQueryProperties, sqlQuerySpec, new FeedRangeState(range, state: null), partitionKey, @@ -1837,7 +1871,7 @@ await ParallelPrefetch.PrefetchInParallelAsync( trace, cancellationToken); - return new OrderByCrossPartitionRangePageEnumerator(documentContainer, enumeratorsAndTokens); + return new OrderByCrossPartitionRangePageEnumerator(documentContainer, enumeratorsAndTokens, containerQueryProperties); } public async ValueTask DisposeAsync() @@ -1894,6 +1928,7 @@ public async ValueTask MoveNextAsync(ITrace trace, CancellationToken cance { await MoveNextAsync_InitializeAsync_HandleSplitAsync( this.documentContainer, + this.containerQueryProperties, this.enumeratorsAndTokens, enumerator, token, diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs index e5c70a9ccd..9d49d9a78e 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByQueryPartitionRangePageAsyncEnumerator.cs @@ -10,7 +10,8 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Pagination; using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; using Microsoft.Azure.Cosmos.Tracing; internal sealed class OrderByQueryPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator, IPrefetcher @@ -19,7 +20,8 @@ internal sealed class OrderByQueryPartitionRangePageAsyncEnumerator : PartitionR private readonly BufferedPartitionRangePageAsyncEnumeratorBase bufferedEnumerator; public static OrderByQueryPartitionRangePageAsyncEnumerator Create( - IQueryDataSource queryDataSource, + IQueryDataSource queryDataSource, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, FeedRangeState feedRangeState, PartitionKey? partitionKey, @@ -28,7 +30,8 @@ public static OrderByQueryPartitionRangePageAsyncEnumerator Create( PrefetchPolicy prefetchPolicy) { InnerEnumerator enumerator = new InnerEnumerator( - queryDataSource, + queryDataSource, + containerQueryProperties, sqlQuerySpec, feedRangeState, partitionKey, @@ -105,10 +108,12 @@ public OrderByQueryPartitionRangePageAsyncEnumerator CloneAsFullyBufferedEnumera private sealed class InnerEnumerator : PartitionRangePageAsyncEnumerator { - private readonly IQueryDataSource queryDataSource; + private readonly IQueryDataSource queryDataSource; + private readonly ContainerQueryProperties containerQueryProperties; public InnerEnumerator( - IQueryDataSource queryDataSource, + IQueryDataSource queryDataSource, + ContainerQueryProperties containerQueryProperties, SqlQuerySpec sqlQuerySpec, FeedRangeState feedRangeState, PartitionKey? partitionKey, @@ -117,6 +122,7 @@ public InnerEnumerator( : base(feedRangeState) { this.queryDataSource = queryDataSource ?? throw new ArgumentNullException(nameof(queryDataSource)); + this.containerQueryProperties = containerQueryProperties; this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec)); this.PartitionKey = partitionKey; this.QueryPaginationOptions = queryPaginationOptions ?? QueryExecutionOptions.Default; @@ -140,6 +146,7 @@ public InnerEnumerator CloneWithMaxPageSize() return new InnerEnumerator( this.queryDataSource, + this.containerQueryProperties, this.SqlQuerySpec, this.FeedRangeState, this.PartitionKey, @@ -151,9 +158,7 @@ public InnerEnumerator CloneWithMaxPageSize() protected override async Task> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken) { - // Unfortunately we need to keep both the epk range and partition key for queries - // Since the continuation token format uses epk range even though we only need the partition key to route the request. - FeedRangeInternal feedRange = this.PartitionKey.HasValue ? new FeedRangePartitionKey(this.PartitionKey.Value) : this.FeedRangeState.FeedRange; + FeedRangeInternal feedRange = HierarchicalPartitionUtils.LimitFeedRangeToSinglePartition(this.PartitionKey, this.FeedRangeState.FeedRange, this.containerQueryProperties); TryCatch monadicQueryPage = await this.queryDataSource .MonadicQueryAsync( diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs index bf10ea2af7..ebb57d0781 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/QueryPartitionRangePageAsyncEnumerator.cs @@ -46,7 +46,7 @@ protected override Task> GetNextPageAsync(ITrace trace, Canc throw new ArgumentNullException(nameof(trace)); } - FeedRangeInternal feedRange = this.LimitFeedRangeToSinglePartition(); + FeedRangeInternal feedRange = HierarchicalPartitionUtils.LimitFeedRangeToSinglePartition(this.partitionKey, this.FeedRangeState.FeedRange, this.containerQueryProperties); return this.queryDataSource.MonadicQueryAsync( sqlQuerySpec: this.sqlQuerySpec, feedRangeState: new FeedRangeState(feedRange, this.FeedRangeState.State), @@ -54,95 +54,5 @@ protected override Task> GetNextPageAsync(ITrace trace, Canc trace: trace, cancellationToken); } - - /// - /// Updates the FeedRange to limit the scope of this enumerator to single physical partition. - /// Generally speaking, a subpartitioned container can experience split partition at any level of hierarchical partition key. - /// This could cause a situation where more than one physical partition contains the data for a partial partition key. - /// Currently, enumerator instantiation does not honor physical partition boundary and allocates entire epk range which could spans across multiple physical partitions to the enumerator. - /// Since such an epk range does not exist at the container level, Service generates a GoneException. - /// This method restrics the range of each container by shrinking the ends of the range so that they do not span across physical partition. - /// - private FeedRangeInternal LimitFeedRangeToSinglePartition() - { - // We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token. - // In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state). - FeedRangeInternal feedRange = this.FeedRangeState.FeedRange; - if (this.partitionKey.HasValue) - { - // ISSUE-HACK-adityasa-3/25/2024 - We should not update the original feed range inside this class. - // Instead we should guarantee that when enumerator is instantiated it is limited to a single physical partition. - // Ultimately we should remove enumerator's dependency on PartitionKey. - if ((this.containerQueryProperties.PartitionKeyDefinition.Paths.Count > 1) && - (this.partitionKey.Value.InternalKey.Components.Count != this.containerQueryProperties.PartitionKeyDefinition.Paths.Count) && - (feedRange is FeedRangeEpk feedRangeEpk)) - { - if (this.containerQueryProperties.EffectiveRangesForPartitionKey == null || - this.containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0) - { - throw new InvalidOperationException( - "EffectiveRangesForPartitionKey should be populated when PK is specified in request options."); - } - - foreach (Documents.Routing.Range epkForPartitionKey in - this.containerQueryProperties.EffectiveRangesForPartitionKey) - { - if (Documents.Routing.Range.CheckOverlapping( - feedRangeEpk.Range, - epkForPartitionKey)) - { - if (!feedRangeEpk.Range.Equals(epkForPartitionKey)) - { - String overlappingMin; - bool minInclusive; - String overlappingMax; - bool maxInclusive; - - if (Documents.Routing.Range.MinComparer.Instance.Compare( - epkForPartitionKey, - feedRangeEpk.Range) < 0) - { - overlappingMin = feedRangeEpk.Range.Min; - minInclusive = feedRangeEpk.Range.IsMinInclusive; - } - else - { - overlappingMin = epkForPartitionKey.Min; - minInclusive = epkForPartitionKey.IsMinInclusive; - } - - if (Documents.Routing.Range.MaxComparer.Instance.Compare( - epkForPartitionKey, - feedRangeEpk.Range) > 0) - { - overlappingMax = feedRangeEpk.Range.Max; - maxInclusive = feedRangeEpk.Range.IsMaxInclusive; - } - else - { - overlappingMax = epkForPartitionKey.Max; - maxInclusive = epkForPartitionKey.IsMaxInclusive; - } - - feedRange = new FeedRangeEpk( - new Documents.Routing.Range( - overlappingMin, - overlappingMax, - minInclusive, - maxInclusive)); - } - - break; - } - } - } - else - { - feedRange = new FeedRangePartitionKey(this.partitionKey.Value); - } - } - - return feedRange; - } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs index 47208f6205..0741138f75 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs @@ -79,7 +79,8 @@ public static TryCatch MonadicCreate( queryPaginationOptions: queryPaginationOptions, maxConcurrency: maxConcurrency, nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy, - continuationToken: continuationToken); + continuationToken: continuationToken, + containerQueryProperties: containerQueryProperties); } else { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs index 872722e110..4fb4430153 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Performance.Tests/Query/OrderByPipelineStageBenchmark.cs @@ -68,7 +68,8 @@ private static async Task CreateAndRunPipeline(IDocumentContainer documentContai queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: EndUserPageSize), maxConcurrency: MaxConcurrency, nonStreamingOrderBy: nonStreamingOrderBy, - continuationToken: null); + continuationToken: null, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); IQueryPipelineStage pipeline = pipelineStage.Result; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs index 1451862287..b00d515b65 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs @@ -515,7 +515,7 @@ public virtual Task> MonadicQueryAsync( } List documents = new List(); - foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition) && IsRecordWithinQueryPartition(r, this.queryRequestOptions, this.partitionKeyDefinition))) + foreach (Record record in records.Where(r => IsRecordWithinFeedRange(r, feedRangeState.FeedRange, this.partitionKeyDefinition))) { CosmosObject document = ConvertRecordToCosmosElement(record); documents.Add(CosmosObject.Create(document)); @@ -719,26 +719,6 @@ public virtual Task> MonadicQueryAsync( } } - private bool IsRecordWithinQueryPartition(Record record, QueryRequestOptions queryRequestOptions, PartitionKeyDefinition partitionKeyDefinition) - { - if(queryRequestOptions?.PartitionKey == null) - { - return true; - } - - IList partitionKey = GetPartitionKeysFromObjectModel(queryRequestOptions.PartitionKey.Value); - IList partitionKeyFromRecord = GetPartitionKeysFromPayload(record.Payload, partitionKeyDefinition); - if (partitionKeyDefinition.Kind == PartitionKind.MultiHash) - { - PartitionKeyHash partitionKeyHash = GetHashFromPartitionKeys(partitionKey, partitionKeyDefinition); - PartitionKeyHash partitionKeyFromRecordHash = GetHashFromPartitionKeys(partitionKeyFromRecord, partitionKeyDefinition); - - return partitionKeyHash.Equals(partitionKeyFromRecordHash) || partitionKeyFromRecordHash.Value.StartsWith(partitionKeyHash.Value); - } - - return partitionKey.SequenceEqual(partitionKeyFromRecord); - } - public Task> MonadicChangeFeedAsync( FeedRangeState feedRangeState, ChangeFeedExecutionOptions changeFeedPaginationOptions, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs index e2ec35831b..6c189e92e7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs @@ -58,7 +58,8 @@ protected override Task>> CreateEnum IAsyncEnumerator> enumerator = new TracingAsyncEnumerator>( OrderByQueryPartitionRangePageAsyncEnumerator.Create( - queryDataSource: documentContainer, + queryDataSource: documentContainer, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties(), sqlQuerySpec: new Cosmos.Query.Core.SqlQuerySpec("SELECT * FROM c"), feedRangeState: new FeedRangeState(ranges[0], state), partitionKey: null, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs index 6d41312496..e579a86ee1 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/NonStreamingOrderByQueryTests.cs @@ -331,7 +331,8 @@ private static async Task RunParityTests( queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: pageSize), maxConcurrency: maxConcurrency, nonStreamingOrderBy: nonStreamingOrderBy, - continuationToken: null); + continuationToken: null, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(pipelineStage.Succeeded); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs index d0ef64dc5e..de1b1b32cf 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/OrderByCrossPartitionQueryPipelineStageTests.cs @@ -79,7 +79,8 @@ public void MonadicCreate_NullContinuationToken() queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: false, - continuationToken: null); + continuationToken: null, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); } @@ -100,7 +101,8 @@ public void MonadicCreate_NonCosmosArrayContinuationToken() queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: false, - continuationToken: CosmosObject.Create(new Dictionary())); + continuationToken: CosmosObject.Create(new Dictionary()), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Failed); Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException); } @@ -122,7 +124,8 @@ public void MonadicCreate_EmptyArrayContinuationToken() queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: false, - continuationToken: CosmosArray.Create(new List())); + continuationToken: CosmosArray.Create(new List()), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Failed); Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException); } @@ -144,7 +147,8 @@ public void MonadicCreate_NonParallelContinuationToken() queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: false, - continuationToken: CosmosArray.Create(new List() { CosmosString.Create("asdf") })); + continuationToken: CosmosArray.Create(new List() { CosmosString.Create("asdf") }), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Failed); Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException); } @@ -185,7 +189,8 @@ public void MonadicCreate_SingleOrderByContinuationToken() new List() { OrderByContinuationToken.ToCosmosElement(orderByContinuationToken) - })); + }), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); } @@ -230,7 +235,8 @@ public void MonadicCreate_SingleOrderByContinuationToken() new List() { OrderByContinuationToken.ToCosmosElement(orderByContinuationToken) - })); + }), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); } } @@ -291,7 +297,8 @@ public void MonadicCreate_MultipleOrderByContinuationToken() { OrderByContinuationToken.ToCosmosElement(orderByContinuationToken1), OrderByContinuationToken.ToCosmosElement(orderByContinuationToken2) - })); + }), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); } } @@ -333,7 +340,8 @@ public void MonadicCreate_OrderByWithResumeValues() new List() { OrderByContinuationToken.ToCosmosElement(orderByContinuationToken) - })); + }), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); } @@ -375,7 +383,8 @@ public void MonadicCreate_OrderByWithResumeValues() new List() { OrderByContinuationToken.ToCosmosElement(orderByContinuationToken) - })); + }), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); } } @@ -427,7 +436,8 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 1), maxConcurrency: 0, nonStreamingOrderBy: false, - continuationToken: CosmosElement.Parse(continuationToken)); + continuationToken: CosmosElement.Parse(continuationToken), + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; @@ -465,7 +475,8 @@ FROM c queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: nonStreamingOrderBy, - continuationToken: null); + continuationToken: null, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; @@ -514,7 +525,8 @@ FROM c queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: false, - continuationToken: null); + continuationToken: null, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); Assert.IsTrue(monadicCreate.Succeeded); IQueryPipelineStage queryPipelineStage = monadicCreate.Result; @@ -576,7 +588,8 @@ FROM c queryPaginationOptions: new QueryExecutionOptions(pageSizeHint: 10), maxConcurrency: 10, nonStreamingOrderBy: nonStreamingOrderBy, - continuationToken: continuationToken); + continuationToken: continuationToken, + containerQueryProperties: new Cosmos.Query.Core.QueryClient.ContainerQueryProperties()); monadicQueryPipelineStage.ThrowIfFailed(); IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs index ef68af02bc..7ecce80a1e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPlanBaselineTests.cs @@ -22,7 +22,7 @@ /// [TestClass] public class QueryPlanBaselineTests : BaselineTests - { + { [TestMethod] [Owner("brchon")] public void Aggregates()