Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
adityasa committed Mar 25, 2024
1 parent cba93d3 commit 5d4a1a1
Show file tree
Hide file tree
Showing 15 changed files with 698 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
cosmosQueryContext.ResourceLink,
inputParameters.PartitionKey,
createQueryPipelineTrace,
cancellationToken);
cancellationToken);
cosmosQueryContext.ContainerResourceId = containerQueryProperties.ResourceId;

Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync(
Expand Down Expand Up @@ -205,6 +205,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateCoreContextAsy
documentContainer,
inputParameters,
targetRanges,
containerQueryProperties,
cancellationToken);
}
}
Expand Down Expand Up @@ -295,11 +296,12 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateFromPartitione
documentContainer,
inputParameters,
targetRanges,
containerQueryProperties,
cancellationToken);
}
else
{
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, partitionedQueryExecutionInfo, cancellationToken);
tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, containerQueryProperties, partitionedQueryExecutionInfo, cancellationToken);
}
}

Expand Down Expand Up @@ -359,6 +361,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSinglePartitio
cosmosQueryContext,
inputParameters,
targetRanges,
containerQueryProperties,
partitionedQueryExecutionInfo,
cancellationToken);
}
Expand All @@ -382,6 +385,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
CosmosQueryContext cosmosQueryContext,
InputParameters inputParameters,
List<Documents.PartitionKeyRange> targetRanges,
ContainerQueryProperties containerQueryProperties,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -418,6 +422,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
inputParameters,
partitionedQueryExecutionInfo,
targetRanges,
containerQueryProperties,
cancellationToken);
}

Expand Down Expand Up @@ -450,6 +455,7 @@ private static async Task<TryCatch<IQueryPipelineStage>> TryCreateSpecializedDoc
cosmosQueryContext,
inputParameters,
targetRanges,
containerQueryProperties,
partitionedQueryExecutionInfo,
cancellationToken);
}
Expand All @@ -467,6 +473,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateOptimisticDirectExecutionC
documentContainer: documentContainer,
inputParameters: inputParameters,
targetRange: new FeedRangeEpk(targetRange.ToRange()),
containerQueryProperties: containerQueryProperties,
fallbackQueryPipelineStageFactory: (continuationToken) =>
{
// In fallback scenario, the Specialized pipeline is always invoked
Expand All @@ -488,6 +495,7 @@ private static TryCatch<IQueryPipelineStage> TryCreatePassthroughQueryExecutionC
DocumentContainer documentContainer,
InputParameters inputParameters,
List<Documents.PartitionKeyRange> targetRanges,
ContainerQueryProperties containerQueryProperties,
CancellationToken cancellationToken)
{
// Return a parallel context, since we still want to be able to handle splits and concurrency / buffering.
Expand All @@ -505,8 +513,9 @@ private static TryCatch<IQueryPipelineStage> TryCreatePassthroughQueryExecutionC
queryPaginationOptions: new QueryPaginationOptions(
pageSizeHint: inputParameters.MaxItemCount),
partitionKey: inputParameters.PartitionKey,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
containerQueryProperties: containerQueryProperties,
maxConcurrency: inputParameters.MaxConcurrency,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
cancellationToken: cancellationToken,
continuationToken: inputParameters.InitialUserContinuationToken);
}
Expand All @@ -516,7 +525,8 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
CosmosQueryContext cosmosQueryContext,
InputParameters inputParameters,
PartitionedQueryExecutionInfo partitionedQueryExecutionInfo,
List<Documents.PartitionKeyRange> targetRanges,
List<Documents.PartitionKeyRange> targetRanges,
ContainerQueryProperties containerQueryProperties,
CancellationToken cancellationToken)
{
QueryInfo queryInfo = partitionedQueryExecutionInfo.QueryInfo;
Expand Down Expand Up @@ -576,6 +586,7 @@ private static TryCatch<IQueryPipelineStage> TryCreateSpecializedDocumentQueryEx
queryInfo: partitionedQueryExecutionInfo.QueryInfo,
queryPaginationOptions: new QueryPaginationOptions(
pageSizeHint: (int)optimalPageSize),
containerQueryProperties: containerQueryProperties,
maxConcurrency: inputParameters.MaxConcurrency,
requestContinuationToken: inputParameters.InitialUserContinuationToken,
requestCancellationToken: cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel
using Microsoft.Azure.Cosmos.Query.Core.Exceptions;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Tracing;
using static Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.PartitionMapper;

Expand Down Expand Up @@ -137,6 +138,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
IReadOnlyList<FeedRangeEpk> targetRanges,
Cosmos.PartitionKey? partitionKey,
QueryPaginationOptions queryPaginationOptions,
ContainerQueryProperties containerQueryProperties,
int maxConcurrency,
PrefetchPolicy prefetchPolicy,
CosmosElement continuationToken,
Expand All @@ -162,7 +164,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(

CrossPartitionRangePageAsyncEnumerator<QueryPage, QueryState> crossPartitionPageEnumerator = new CrossPartitionRangePageAsyncEnumerator<QueryPage, QueryState>(
feedRangeProvider: documentContainer,
createPartitionRangeEnumerator: ParallelCrossPartitionQueryPipelineStage.MakeCreateFunction(documentContainer, sqlQuerySpec, queryPaginationOptions, partitionKey, cancellationToken),
createPartitionRangeEnumerator: ParallelCrossPartitionQueryPipelineStage.MakeCreateFunction(documentContainer, sqlQuerySpec, queryPaginationOptions, partitionKey, containerQueryProperties, cancellationToken),
comparer: Comparer.Singleton,
maxConcurrency: maxConcurrency,
prefetchPolicy: prefetchPolicy,
Expand Down Expand Up @@ -249,12 +251,14 @@ private static CreatePartitionRangePageAsyncEnumerator<QueryPage, QueryState> Ma
SqlQuerySpec sqlQuerySpec,
QueryPaginationOptions queryPaginationOptions,
Cosmos.PartitionKey? partitionKey,
ContainerQueryProperties containerQueryProperties,
CancellationToken cancellationToken) => (FeedRangeState<QueryState> feedRangeState) => new QueryPartitionRangePageAsyncEnumerator(
queryDataSource,
sqlQuerySpec,
feedRangeState,
partitionKey,
queryPaginationOptions,
containerQueryProperties,
cancellationToken);

public void SetCancellationToken(CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class QueryPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator<QueryPage, QueryState>
{
private readonly IQueryDataSource queryDataSource;
private readonly SqlQuerySpec sqlQuerySpec;
private readonly QueryPaginationOptions queryPaginationOptions;
private readonly ContainerQueryProperties containerQueryProperties;
private readonly Cosmos.PartitionKey? partitionKey;

public QueryPartitionRangePageAsyncEnumerator(
Expand All @@ -25,13 +27,15 @@ public QueryPartitionRangePageAsyncEnumerator(
FeedRangeState<QueryState> feedRangeState,
Cosmos.PartitionKey? partitionKey,
QueryPaginationOptions queryPaginationOptions,
ContainerQueryProperties containerQueryProperties,
CancellationToken cancellationToken)
: base(feedRangeState, cancellationToken)
{
this.queryDataSource = queryDataSource ?? throw new ArgumentNullException(nameof(queryDataSource));
this.sqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
this.queryPaginationOptions = queryPaginationOptions;
this.partitionKey = partitionKey;
this.containerQueryProperties = containerQueryProperties;
}

public override ValueTask DisposeAsync() => default;
Expand All @@ -43,15 +47,92 @@ protected override Task<TryCatch<QueryPage>> GetNextPageAsync(ITrace trace, Canc
throw new ArgumentNullException(nameof(trace));
}

// We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token.
// In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state).
FeedRangeInternal feedRange = this.partitionKey.HasValue ? new FeedRangePartitionKey(this.partitionKey.Value) : this.FeedRangeState.FeedRange;
FeedRangeInternal feedRange = this.LimitFeedRangeToSinglePartition();
return this.queryDataSource.MonadicQueryAsync(
sqlQuerySpec: this.sqlQuerySpec,
feedRangeState: new FeedRangeState<QueryState>(feedRange, this.FeedRangeState.State),
queryPaginationOptions: this.queryPaginationOptions,
trace: trace,
cancellationToken);
}

/// <summary>
/// Updates the FeedRange to limit the scope of this enumerator to single physical partition.
/// Generally speaking, a subpartitioned container can experience split partition at any level of hierarchical partition key.
/// This could cause a situation where more than one physical partition contains the data for a partial partition key.
/// Currently, enumerator instantiation does not honor physical partition boundary and allocates entire epk range which could spans across multiple physical partitions to the enumerator.
/// Since such an epk range does not exist at the container level, Service generates a GoneException.
/// This method restrics the range of each container by shrinking the ends of the range so that they do not span across physical partition.
/// </summary>
private FeedRangeInternal LimitFeedRangeToSinglePartition()
{
// We sadly need to check the partition key, since a user can set a partition key in the request options with a different continuation token.
// In the future the partition filtering and continuation information needs to be a tightly bounded contract (like cross feed range state).
FeedRangeInternal feedRange = this.FeedRangeState.FeedRange;

if (feedRange is FeedRangeEpk feedRangeEpk && this.partitionKey.HasValue)
{
if (this.containerQueryProperties.EffectiveRangesForPartitionKey == null ||
this.containerQueryProperties.EffectiveRangesForPartitionKey.Count == 0)
{
throw new InvalidOperationException(
"EffectiveRangesForPartitionKey should be populated when PK is specified in request options.");
}

foreach (Documents.Routing.Range<String> epkForPartitionKey in
this.containerQueryProperties.EffectiveRangesForPartitionKey)
{
if (Documents.Routing.Range<String>.CheckOverlapping(
feedRangeEpk.Range,
epkForPartitionKey))
{
if (!feedRangeEpk.Range.Equals(epkForPartitionKey))
{
String overlappingMin;
bool minInclusive;
String overlappingMax;
bool maxInclusive;

if (Documents.Routing.Range<String>.MinComparer.Instance.Compare(
epkForPartitionKey,
feedRangeEpk.Range) < 0)
{
overlappingMin = feedRangeEpk.Range.Min;
minInclusive = feedRangeEpk.Range.IsMinInclusive;
}
else
{
overlappingMin = epkForPartitionKey.Min;
minInclusive = epkForPartitionKey.IsMinInclusive;
}

if (Documents.Routing.Range<String>.MaxComparer.Instance.Compare(
epkForPartitionKey,
feedRangeEpk.Range) > 0)
{
overlappingMax = feedRangeEpk.Range.Max;
maxInclusive = feedRangeEpk.Range.IsMaxInclusive;
}
else
{
overlappingMax = epkForPartitionKey.Max;
maxInclusive = epkForPartitionKey.IsMaxInclusive;
}

feedRange = new FeedRangeEpk(
new Documents.Routing.Range<String>(
overlappingMin,
overlappingMax,
minInclusive,
maxInclusive));
}

break;
}
}
}

return feedRange;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

Expand Down Expand Up @@ -142,6 +143,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
DocumentContainer documentContainer,
CosmosQueryExecutionContextFactory.InputParameters inputParameters,
FeedRangeEpk targetRange,
ContainerQueryProperties containerQueryProperties,
FallbackQueryPipelineStageFactory fallbackQueryPipelineStageFactory,
CancellationToken cancellationToken)
{
Expand All @@ -152,6 +154,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
targetRange: targetRange,
queryPaginationOptions: paginationOptions,
partitionKey: inputParameters.PartitionKey,
containerQueryProperties: containerQueryProperties,
continuationToken: inputParameters.InitialUserContinuationToken,
cancellationToken: cancellationToken);

Expand Down Expand Up @@ -247,6 +250,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
FeedRangeEpk targetRange,
Cosmos.PartitionKey? partitionKey,
QueryPaginationOptions queryPaginationOptions,
ContainerQueryProperties containerQueryProperties,
CosmosElement continuationToken,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -283,6 +287,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
feedRangeState,
partitionKey,
queryPaginationOptions,
containerQueryProperties,
cancellationToken);

OptimisticDirectExecutionQueryPipelineImpl stage = new OptimisticDirectExecutionQueryPipelineImpl(partitionPageEnumerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Skip;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Take;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;

internal static class PipelineFactory
Expand All @@ -32,6 +33,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
PartitionKey? partitionKey,
QueryInfo queryInfo,
QueryPaginationOptions queryPaginationOptions,
ContainerQueryProperties containerQueryProperties,
int maxConcurrency,
CosmosElement requestContinuationToken,
CancellationToken requestCancellationToken)
Expand Down Expand Up @@ -89,6 +91,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
targetRanges: targetRanges,
queryPaginationOptions: queryPaginationOptions,
partitionKey: partitionKey,
containerQueryProperties: containerQueryProperties,
prefetchPolicy: prefetchPolicy,
maxConcurrency: maxConcurrency,
continuationToken: continuationToken,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Results>
<Result>
<Input />
<Output>
<Documents><![CDATA[{"id":"2","value2":"12"},
{"id":"2","value2":"17"},
{"id":"2","value2":"2"},
{"id":"2","value2":"22"},
{"id":"2","value2":"27"},
{"id":"2","value2":"32"},
{"id":"2","value2":"37"},
{"id":"2","value2":"42"},
{"id":"2","value2":"47"},
{"id":"2","value2":"52"},
{"id":"2","value2":"57"},
{"id":"2","value2":"62"},
{"id":"2","value2":"67"},
{"id":"2","value2":"7"},
{"id":"2","value2":"72"},
{"id":"2","value2":"77"},
{"id":"2","value2":"82"},
{"id":"2","value2":"87"},
{"id":"2","value2":"92"},
{"id":"2","value2":"97"}]]></Documents>
</Output>
</Result>
</Results>
Loading

0 comments on commit 5d4a1a1

Please sign in to comment.