From e383d83946bf7c9eddeb161cd577b18609e2ac19 Mon Sep 17 00:00:00 2001 From: akotalwar <94020786+akotalwar@users.noreply.github.com> Date: Thu, 19 Jan 2023 17:32:54 -0800 Subject: [PATCH] [Internal] Query: Adds Split Support for Ode (#3572) * Added tests to test different aspects of merge/split support with OptimisticDirectExecution pipeline. Tests check for gone exception handling, pipeline switching etc. * Added gone exception simulation tests. * Added new tests and improved test infra * Removed ParalleContEvocation test. Fixed comments * Removed CreateParallelCrossPartitionPipelineStateAsync() as it is not being used anymore * Removed while loop in CreateDocumentContainerAsync() * Fixed comments. * Updated ExecuteGoneExceptionOnODEPipeline() * Added type Assert for ExecuteGoneExceptionOnODEPipeline() * Replaced try-catch with if statement in MoveNextAsync() * Added delegate to access TryCreateCoreContextAsync() * Added check to confirm Ode pipeline is not called in fallback plan * Updated method name from OptimisticDirectExecutionContext() to TryCreateOptimisticDirectExecutionContext() * Using delegate instead of Func<>. * Ode fallback plan always calls Specialized pipeline * Using ServiceInterop/Gateway to get QueryPlan for Specialized Pipeline * Added new test to check handling of failing fallback pipeline * Code cleanup * Added logic for handling non ODE continuation tokens * Moved delegate away from member variables * Added tests for Merge case * Updated method names * Added checks for tryCatch * Updated SetCancellationToken() to use Try * Updated TryUnwrapContinuationToken() * Removed changes in FlakyDocumentContainer.cs * Removed unused imports * Updated comments * Fixed comments and cleaned up test code * Added CosmosElement null check in TryUnwrapContinuationToken() * Removed FlakyDocumentContainer.cs from pull request * Removed unused imports * Updated TryUnwrapContinuationToken() * Update MoveNextAsync() call in OptimisticDirectExecutionQueryBaselineTests.cs * Made MergeTestUtil.IsFailedFallbackPipelineTest a readonly property * Added IsPartitionSplitException() overload to take CosmosElement * Fixed bug regarding syntax error queries --- .../Query/Core/Monads/TryCatch{TResult}.cs | 32 + .../Pipeline/CosmosExceptionExtensions.cs | 28 + .../CosmosQueryExecutionContextFactory.cs | 339 ++++++++--- ...OrderByCrossPartitionQueryPipelineStage.cs | 4 +- ...imisticDirectExecutionContinuationToken.cs | 6 +- ...misticDirectExecutionQueryPipelineStage.cs | 316 ++++++---- ...egativeOptimisticDirectExecutionOutput.xml | 26 + ...ositiveOptimisticDirectExecutionOutput.xml | 26 + ...misticDirectExecutionQueryBaselineTests.cs | 553 +++++++++++------- 9 files changed, 929 insertions(+), 401 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosExceptionExtensions.cs diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Monads/TryCatch{TResult}.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Monads/TryCatch{TResult}.cs index 8d9c43b036..38d0f4d1fc 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Monads/TryCatch{TResult}.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Monads/TryCatch{TResult}.cs @@ -126,6 +126,38 @@ public async Task> TryAsync( return matchResult; } + public async ValueTask> TryAsync( + Func> onSuccess) + { + TryCatch matchResult; + if (this.Succeeded) + { + matchResult = TryCatch.FromResult(await onSuccess(this.either.FromRight(default))); + } + else + { + matchResult = TryCatch.FromException(this.either.FromLeft(default)); + } + + return matchResult; + } + + public TryCatch Try( + Func> onSuccess) + { + TryCatch matchResult; + if (this.Succeeded) + { + matchResult = onSuccess(this.either.FromRight(default)); + } + else + { + matchResult = TryCatch.FromException(this.either.FromLeft(default)); + } + + return matchResult; + } + public TryCatch Catch( Action onError) { diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosExceptionExtensions.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosExceptionExtensions.cs new file mode 100644 index 0000000000..dca1ef488d --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosExceptionExtensions.cs @@ -0,0 +1,28 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline +{ + using System; + + internal static class CosmosExceptionExtensions + { + public static bool IsPartitionSplitException(this Exception ex) + { + if (ex != null) + { + return IsPartitionSplitException(ex as CosmosException); + } + + return false; + } + + public static bool IsPartitionSplitException(this CosmosException ex) + { + return ex is CosmosException cosmosException + && (cosmosException.StatusCode == System.Net.HttpStatusCode.Gone) + && (cosmosException.SubStatusCode == (int)Documents.SubStatusCodes.PartitionKeyRangeGone); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs index 36e14d78bd..371391386e 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs @@ -136,23 +136,24 @@ private static async Task> TryCreateCoreContextAsy cancellationToken); cosmosQueryContext.ContainerResourceId = containerQueryProperties.ResourceId; - Documents.PartitionKeyRange targetRange = await GetTargetRangeOptimisticDirectExecutionAsync( - inputParameters, - queryPlanFromContinuationToken, - cosmosQueryContext, - containerQueryProperties, + Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync( + inputParameters, + queryPlanFromContinuationToken, + cosmosQueryContext, + containerQueryProperties, trace); - + if (targetRange != null) { - // Test code added to confirm the correct pipeline is being utilized - SetTestInjectionPipelineType(inputParameters, OptimisticDirectExecution); - - return OptimisticDirectExecutionContext( - documentContainer, - inputParameters, - targetRange, - cancellationToken); + return await TryCreateExecutionContextAsync( + documentContainer, + partitionedQueryExecutionInfo: null, + cosmosQueryContext, + containerQueryProperties, + inputParameters, + targetRange, + trace, + cancellationToken); } PartitionedQueryExecutionInfo partitionedQueryExecutionInfo; @@ -226,33 +227,12 @@ private static async Task> TryCreateCoreContextAsy } } - if (cosmosQueryContext.QueryClient.ByPassQueryParsing()) - { - // For non-Windows platforms(like Linux and OSX) in .NET Core SDK, we cannot use ServiceInterop, so need to bypass in that case. - // We are also now bypassing this for 32 bit host process running even on Windows as there are many 32 bit apps that will not work without this - partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanThroughGatewayAsync( - cosmosQueryContext, - inputParameters.SqlQuerySpec, - cosmosQueryContext.ResourceLink, - inputParameters.PartitionKey, - createQueryPipelineTrace, - cancellationToken); - } - else - { - Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties); - - partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync( - cosmosQueryContext.QueryClient, - inputParameters.SqlQuerySpec, - cosmosQueryContext.ResourceTypeEnum, - partitionKeyDefinition, - inputParameters.PartitionKey != null, - containerQueryProperties.GeospatialType, - cosmosQueryContext.UseSystemPrefix, - createQueryPipelineTrace, - cancellationToken); - } + partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync( + cosmosQueryContext, + inputParameters, + containerQueryProperties, + createQueryPipelineTrace, + cancellationToken); } return await TryCreateFromPartitionedQueryExecutionInfoAsync( @@ -306,79 +286,187 @@ private static async Task> TryCreateFromPartitione TryCatch tryCreatePipelineStage; - Documents.PartitionKeyRange targetRange = await GetTargetRangeOptimisticDirectExecutionAsync( + Documents.PartitionKeyRange targetRange = await TryGetTargetRangeOptimisticDirectExecutionAsync( inputParameters, partitionedQueryExecutionInfo, cosmosQueryContext, - containerQueryProperties, + containerQueryProperties, trace); if (targetRange != null) { - SetTestInjectionPipelineType(inputParameters, OptimisticDirectExecution); - - tryCreatePipelineStage = CosmosQueryExecutionContextFactory.OptimisticDirectExecutionContext( + tryCreatePipelineStage = await TryCreateExecutionContextAsync( documentContainer, + partitionedQueryExecutionInfo, + cosmosQueryContext, + containerQueryProperties, inputParameters, targetRange, + trace, cancellationToken); - - return tryCreatePipelineStage; } - - if (createPassthroughQuery) + else { - SetTestInjectionPipelineType(inputParameters, Passthrough); + if (createPassthroughQuery) + { + SetTestInjectionPipelineType(inputParameters, Passthrough); - tryCreatePipelineStage = CosmosQueryExecutionContextFactory.TryCreatePassthroughQueryExecutionContext( - documentContainer, - inputParameters, - targetRanges, - cancellationToken); + tryCreatePipelineStage = CosmosQueryExecutionContextFactory.TryCreatePassthroughQueryExecutionContext( + documentContainer, + inputParameters, + targetRanges, + cancellationToken); + } + else + { + tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext(documentContainer, cosmosQueryContext, inputParameters, targetRanges, partitionedQueryExecutionInfo, cancellationToken); + } } - else + + return tryCreatePipelineStage; + } + + private static async Task> TryCreateExecutionContextAsync( + DocumentContainer documentContainer, + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, + CosmosQueryContext cosmosQueryContext, + ContainerQueryProperties containerQueryProperties, + InputParameters inputParameters, + Documents.PartitionKeyRange targetRange, + ITrace trace, + CancellationToken cancellationToken) + { + // Test code added to confirm the correct pipeline is being utilized + SetTestInjectionPipelineType(inputParameters, OptimisticDirectExecution); + + TryCatch tryCreatePipelineStage = CosmosQueryExecutionContextFactory.TryCreateOptimisticDirectExecutionContext( + documentContainer, + cosmosQueryContext, + containerQueryProperties, + inputParameters, + targetRange, + cancellationToken); + + // A malformed continuation token exception would happen for 2 reasons here + // 1. the token is actually malformed + // 2. Its a non Ode continuation token + // In both cases, Ode pipeline delegates the work to the Specialized pipeline + // as Ode pipeline should not take over execution while some other pipeline is already handling it + if (tryCreatePipelineStage.Failed && tryCreatePipelineStage.InnerMostException is MalformedContinuationTokenException) { SetTestInjectionPipelineType(inputParameters, Specialized); - if (!string.IsNullOrEmpty(partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery)) + if (partitionedQueryExecutionInfo != null) { - // We need pass down the rewritten query. - SqlQuerySpec rewrittenQuerySpec = new SqlQuerySpec() + List targetRanges = new List { - QueryText = partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery, - Parameters = inputParameters.SqlQuerySpec.Parameters + targetRange }; - inputParameters = new InputParameters( - rewrittenQuerySpec, - inputParameters.InitialUserContinuationToken, - inputParameters.InitialFeedRange, - inputParameters.MaxConcurrency, - inputParameters.MaxItemCount, - inputParameters.MaxBufferedItemCount, - inputParameters.PartitionKey, - inputParameters.Properties, - inputParameters.PartitionedQueryExecutionInfo, - inputParameters.ExecutionEnvironment, - inputParameters.ReturnResultsInDeterministicOrder, - inputParameters.ForcePassthrough, - inputParameters.TestInjections); + tryCreatePipelineStage = TryCreateSpecializedDocumentQueryExecutionContext( + documentContainer, + cosmosQueryContext, + inputParameters, + targetRanges, + partitionedQueryExecutionInfo, + cancellationToken); + } + else + { + tryCreatePipelineStage = await TryCreateSpecializedDocumentQueryExecutionContextAsync( + documentContainer, + cosmosQueryContext, + containerQueryProperties, + inputParameters, + trace, + cancellationToken); } - - tryCreatePipelineStage = CosmosQueryExecutionContextFactory.TryCreateSpecializedDocumentQueryExecutionContext( - documentContainer, - cosmosQueryContext, - inputParameters, - partitionedQueryExecutionInfo, - targetRanges, - cancellationToken); } return tryCreatePipelineStage; } - - private static TryCatch OptimisticDirectExecutionContext( + + private static TryCatch TryCreateSpecializedDocumentQueryExecutionContext( DocumentContainer documentContainer, + CosmosQueryContext cosmosQueryContext, + InputParameters inputParameters, + List targetRanges, + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, + CancellationToken cancellationToken) + { + SetTestInjectionPipelineType(inputParameters, Specialized); + + if (!string.IsNullOrEmpty(partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery)) + { + // We need pass down the rewritten query. + SqlQuerySpec rewrittenQuerySpec = new SqlQuerySpec() + { + QueryText = partitionedQueryExecutionInfo.QueryInfo.RewrittenQuery, + Parameters = inputParameters.SqlQuerySpec.Parameters + }; + + inputParameters = new InputParameters( + rewrittenQuerySpec, + inputParameters.InitialUserContinuationToken, + inputParameters.InitialFeedRange, + inputParameters.MaxConcurrency, + inputParameters.MaxItemCount, + inputParameters.MaxBufferedItemCount, + inputParameters.PartitionKey, + inputParameters.Properties, + inputParameters.PartitionedQueryExecutionInfo, + inputParameters.ExecutionEnvironment, + inputParameters.ReturnResultsInDeterministicOrder, + inputParameters.ForcePassthrough, + inputParameters.TestInjections); + } + + return CosmosQueryExecutionContextFactory.TryCreateSpecializedDocumentQueryExecutionContext( + documentContainer, + cosmosQueryContext, + inputParameters, + partitionedQueryExecutionInfo, + targetRanges, + cancellationToken); + } + + private static async Task> TryCreateSpecializedDocumentQueryExecutionContextAsync( + DocumentContainer documentContainer, + CosmosQueryContext cosmosQueryContext, + ContainerQueryProperties containerQueryProperties, + InputParameters inputParameters, + ITrace trace, + CancellationToken cancellationToken) + { + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = await GetPartitionedQueryExecutionInfoAsync( + cosmosQueryContext, + inputParameters, + containerQueryProperties, + trace, + cancellationToken); + + List targetRanges = await CosmosQueryExecutionContextFactory.GetTargetPartitionKeyRangesAsync( + cosmosQueryContext.QueryClient, + cosmosQueryContext.ResourceLink, + partitionedQueryExecutionInfo, + containerQueryProperties, + inputParameters.Properties, + inputParameters.InitialFeedRange, + trace); + + return TryCreateSpecializedDocumentQueryExecutionContext( + documentContainer, + cosmosQueryContext, + inputParameters, + targetRanges, + partitionedQueryExecutionInfo, + cancellationToken); + } + + private static TryCatch TryCreateOptimisticDirectExecutionContext( + DocumentContainer documentContainer, + CosmosQueryContext cosmosQueryContext, + ContainerQueryProperties containerQueryProperties, InputParameters inputParameters, Documents.PartitionKeyRange targetRange, CancellationToken cancellationToken) @@ -386,14 +474,26 @@ private static TryCatch OptimisticDirectExecutionContext( // Return a OptimisticDirectExecution context return OptimisticDirectExecutionQueryPipelineStage.MonadicCreate( documentContainer: documentContainer, - sqlQuerySpec: inputParameters.SqlQuerySpec, + inputParameters: inputParameters, targetRange: new FeedRangeEpk(targetRange.ToRange()), queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: inputParameters.MaxItemCount), - partitionKey: inputParameters.PartitionKey, - continuationToken: inputParameters.InitialUserContinuationToken, + fallbackQueryPipelineStageFactory: (continuationToken) => + { + // In fallback scenario, the Specialized pipeline is always invoked + Task> tryCreateContext = + CosmosQueryExecutionContextFactory.TryCreateSpecializedDocumentQueryExecutionContextAsync( + documentContainer, + cosmosQueryContext, + containerQueryProperties, + inputParameters.WithContinuationToken(continuationToken), + NoOpTrace.Singleton, + default); + + return tryCreateContext; + }, cancellationToken: cancellationToken); } - + private static TryCatch TryCreatePassthroughQueryExecutionContext( DocumentContainer documentContainer, InputParameters inputParameters, @@ -478,6 +578,45 @@ private static TryCatch TryCreateSpecializedDocumentQueryEx requestCancellationToken: cancellationToken); } + private static async Task GetPartitionedQueryExecutionInfoAsync( + CosmosQueryContext cosmosQueryContext, + InputParameters inputParameters, + ContainerQueryProperties containerQueryProperties, + ITrace trace, + CancellationToken cancellationToken) + { + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo; + if (cosmosQueryContext.QueryClient.ByPassQueryParsing()) + { + // For non-Windows platforms(like Linux and OSX) in .NET Core SDK, we cannot use ServiceInterop, so need to bypass in that case. + // We are also now bypassing this for 32 bit host process running even on Windows as there are many 32 bit apps that will not work without this + partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanThroughGatewayAsync( + cosmosQueryContext, + inputParameters.SqlQuerySpec, + cosmosQueryContext.ResourceLink, + inputParameters.PartitionKey, + trace, + cancellationToken); + } + else + { + Documents.PartitionKeyDefinition partitionKeyDefinition = GetPartitionKeyDefinition(inputParameters, containerQueryProperties); + + partitionedQueryExecutionInfo = await QueryPlanRetriever.GetQueryPlanWithServiceInteropAsync( + cosmosQueryContext.QueryClient, + inputParameters.SqlQuerySpec, + cosmosQueryContext.ResourceTypeEnum, + partitionKeyDefinition, + inputParameters.PartitionKey != null, + containerQueryProperties.GeospatialType, + cosmosQueryContext.UseSystemPrefix, + trace, + cancellationToken); + } + + return partitionedQueryExecutionInfo; + } + /// /// Gets the list of partition key ranges. /// 1. Check partition key range id @@ -606,7 +745,7 @@ private static Documents.PartitionKeyDefinition GetPartitionKeyDefinition(InputP return partitionKeyDefinition; } - private static async Task GetTargetRangeOptimisticDirectExecutionAsync( + private static async Task TryGetTargetRangeOptimisticDirectExecutionAsync( InputParameters inputParameters, PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, CosmosQueryContext cosmosQueryContext, @@ -734,6 +873,24 @@ public InputParameters( public bool ReturnResultsInDeterministicOrder { get; } public TestInjections TestInjections { get; } public bool ForcePassthrough { get; } + + public InputParameters WithContinuationToken(CosmosElement token) + { + return new InputParameters( + this.SqlQuerySpec, + token, + this.InitialFeedRange, + this.MaxConcurrency, + this.MaxItemCount, + this.MaxBufferedItemCount, + this.PartitionKey, + this.Properties, + this.PartitionedQueryExecutionInfo, + this.ExecutionEnvironment, + this.ReturnResultsInDeterministicOrder, + this.ForcePassthrough, + this.TestInjections); + } } internal sealed class AggregateProjectionDetector diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs index bbd3d7be4e..10c3b0ee5e 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs @@ -1153,9 +1153,7 @@ private static bool IsSplitException(Exception exception) exception = exception.InnerException; } - return exception is CosmosException cosmosException - && (cosmosException.StatusCode == HttpStatusCode.Gone) - && (cosmosException.SubStatusCode == (int)Documents.SubStatusCodes.PartitionKeyRangeGone); + return exception.IsPartitionSplitException(); } public void SetCancellationToken(CancellationToken cancellationToken) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs index c81c378029..59f87295f3 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionContinuationToken.cs @@ -43,11 +43,11 @@ public static CosmosElement ToCosmosElement(OptimisticDirectExecutionContinuatio public static TryCatch TryCreateFromCosmosElement(CosmosElement cosmosElement) { CosmosObject cosmosObjectContinuationToken = cosmosElement as CosmosObject; - if (cosmosObjectContinuationToken == null) + if (cosmosObjectContinuationToken == null || !cosmosObjectContinuationToken.ContainsKey(OptimisticDirectExecutionToken)) { return TryCatch.FromException( - new MalformedChangeFeedContinuationTokenException( - message: $"Malformed Continuation Token")); + new MalformedContinuationTokenException( + message: $"Malformed Continuation Token: Expected OptimisticDirectExecutionToken\r\n")); } TryCatch inner = ParallelContinuationToken.TryCreateFromCosmosElement(cosmosObjectContinuationToken[OptimisticDirectExecutionToken]); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionQueryPipelineStage.cs index 9f86b0ad8b..0664603024 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/OptimisticDirectExecution/OptimisticDirectExecutionQueryPipelineStage.cs @@ -12,168 +12,268 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQu using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Pagination; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.Pipeline; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; using Microsoft.Azure.Cosmos.Tracing; - using static Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.PartitionMapper; internal sealed class OptimisticDirectExecutionQueryPipelineStage : IQueryPipelineStage { - private readonly QueryPartitionRangePageAsyncEnumerator queryPartitionRangePageAsyncEnumerator; - - private OptimisticDirectExecutionQueryPipelineStage( - QueryPartitionRangePageAsyncEnumerator queryPartitionRangePageAsyncEnumerator) + private enum ExecutionState { - this.queryPartitionRangePageAsyncEnumerator = queryPartitionRangePageAsyncEnumerator ?? throw new ArgumentNullException(nameof(queryPartitionRangePageAsyncEnumerator)); + OptimisticDirectExecution, + SpecializedDocumentQueryExecution, } - public TryCatch Current { get; private set; } + private const string optimisticDirectExecutionToken = "OptimisticDirectExecutionToken"; + private readonly FallbackQueryPipelineStageFactory queryPipelineStageFactory; + private TryCatch inner; + private CosmosElement continuationToken; + private ExecutionState executionState; - public ValueTask DisposeAsync() + private OptimisticDirectExecutionQueryPipelineStage(TryCatch inner, FallbackQueryPipelineStageFactory queryPipelineStageFactory, CosmosElement continuationToken) { - return this.queryPartitionRangePageAsyncEnumerator.DisposeAsync(); + this.inner = inner; + this.queryPipelineStageFactory = queryPipelineStageFactory; + this.continuationToken = continuationToken; + this.executionState = ExecutionState.OptimisticDirectExecution; } - public void SetCancellationToken(CancellationToken cancellationToken) + public delegate Task> FallbackQueryPipelineStageFactory(CosmosElement continuationToken); + + public TryCatch Current => this.inner.Try(pipelineStage => pipelineStage.Current); + + public ValueTask DisposeAsync() { - this.queryPartitionRangePageAsyncEnumerator.SetCancellationToken(cancellationToken); + return this.inner.Failed ? default : this.inner.Result.DisposeAsync(); } public async ValueTask MoveNextAsync(ITrace trace) { - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } + TryCatch hasNext = await this.inner.TryAsync(pipelineStage => pipelineStage.MoveNextAsync(trace)); + bool success = hasNext.Succeeded && hasNext.Result; + bool isPartitionSplitException = hasNext.Succeeded && this.Current.Failed && this.Current.InnerMostException.IsPartitionSplitException(); - if (!await this.queryPartitionRangePageAsyncEnumerator.MoveNextAsync(trace)) + if (success && !isPartitionSplitException) { - this.Current = default; - return false; + this.continuationToken = this.Current.Succeeded ? this.Current.Result.State?.Value : null; } - - TryCatch partitionPage = this.queryPartitionRangePageAsyncEnumerator.Current; - if (partitionPage.Failed) + else if (isPartitionSplitException && this.executionState == ExecutionState.OptimisticDirectExecution) { - this.Current = partitionPage; - return true; + this.inner = await this.queryPipelineStageFactory(this.TryUnwrapContinuationToken()); + this.executionState = ExecutionState.SpecializedDocumentQueryExecution; + if (this.inner.Failed) + { + return false; + } + + success = await this.inner.Result.MoveNextAsync(trace); } - QueryPage backendQueryPage = partitionPage.Result; + return success; + } - QueryState queryState; - if (backendQueryPage.State == null) - { - queryState = null; - } - else + public void SetCancellationToken(CancellationToken cancellationToken) + { + this.inner.Try(pipelineStage => pipelineStage.SetCancellationToken(cancellationToken)); + } + + private CosmosElement TryUnwrapContinuationToken() + { + if (this.continuationToken != null) { - QueryState backendQueryState = backendQueryPage.State; - ParallelContinuationToken parallelContinuationToken = new ParallelContinuationToken( - token: (backendQueryState?.Value as CosmosString)?.Value, - range: ((FeedRangeEpk)this.queryPartitionRangePageAsyncEnumerator.FeedRangeState.FeedRange).Range); - - OptimisticDirectExecutionContinuationToken optimisticDirectExecutionContinuationToken = new OptimisticDirectExecutionContinuationToken(parallelContinuationToken); - CosmosElement cosmosElementContinuationToken = OptimisticDirectExecutionContinuationToken.ToCosmosElement(optimisticDirectExecutionContinuationToken); - queryState = new QueryState(cosmosElementContinuationToken); + CosmosObject cosmosObject = this.continuationToken as CosmosObject; + CosmosElement backendContinuationToken = cosmosObject[optimisticDirectExecutionToken]; + Debug.Assert(backendContinuationToken != null); + return CosmosArray.Create(backendContinuationToken); } - QueryPage queryPage = new QueryPage( - backendQueryPage.Documents, - backendQueryPage.RequestCharge, - backendQueryPage.ActivityId, - backendQueryPage.ResponseLengthInBytes, - backendQueryPage.CosmosQueryExecutionInfo, - disallowContinuationTokenMessage: null, - backendQueryPage.AdditionalHeaders, - queryState); - - this.Current = TryCatch.FromResult(queryPage); - return true; + return null; } public static TryCatch MonadicCreate( - IDocumentContainer documentContainer, - SqlQuerySpec sqlQuerySpec, - FeedRangeEpk targetRange, - Cosmos.PartitionKey? partitionKey, - QueryPaginationOptions queryPaginationOptions, - CosmosElement continuationToken, - CancellationToken cancellationToken) + DocumentContainer documentContainer, + CosmosQueryExecutionContextFactory.InputParameters inputParameters, + FeedRangeEpk targetRange, + QueryPaginationOptions queryPaginationOptions, + FallbackQueryPipelineStageFactory fallbackQueryPipelineStageFactory, + CancellationToken cancellationToken) { - if (targetRange == null) + TryCatch pipelineStage = OptimisticDirectExecutionQueryPipelineImpl.MonadicCreate( + documentContainer: documentContainer, + sqlQuerySpec: inputParameters.SqlQuerySpec, + targetRange: targetRange, + queryPaginationOptions: queryPaginationOptions, + partitionKey: inputParameters.PartitionKey, + continuationToken: inputParameters.InitialUserContinuationToken, + cancellationToken: cancellationToken); + + if (pipelineStage.Failed) { - throw new ArgumentNullException(nameof(targetRange)); + return pipelineStage; } - - TryCatch> monadicExtractState; - if (continuationToken == null) + + OptimisticDirectExecutionQueryPipelineStage odePipelineStageMonadicCreate = new OptimisticDirectExecutionQueryPipelineStage(pipelineStage, fallbackQueryPipelineStageFactory, inputParameters.InitialUserContinuationToken); + return TryCatch.FromResult(odePipelineStageMonadicCreate); + } + + private class OptimisticDirectExecutionQueryPipelineImpl : IQueryPipelineStage + { + private readonly QueryPartitionRangePageAsyncEnumerator queryPartitionRangePageAsyncEnumerator; + + private OptimisticDirectExecutionQueryPipelineImpl( + QueryPartitionRangePageAsyncEnumerator queryPartitionRangePageAsyncEnumerator) { - FeedRangeState getState = new (targetRange, (QueryState)null); - monadicExtractState = TryCatch>.FromResult(getState); + this.queryPartitionRangePageAsyncEnumerator = queryPartitionRangePageAsyncEnumerator ?? throw new ArgumentNullException(nameof(queryPartitionRangePageAsyncEnumerator)); } - else + + public TryCatch Current { get; private set; } + + public ValueTask DisposeAsync() { - monadicExtractState = MonadicExtractState(continuationToken, targetRange); + return this.queryPartitionRangePageAsyncEnumerator.DisposeAsync(); } - - if (monadicExtractState.Failed) + + public void SetCancellationToken(CancellationToken cancellationToken) { - return TryCatch.FromException(monadicExtractState.Exception); + this.queryPartitionRangePageAsyncEnumerator.SetCancellationToken(cancellationToken); } - FeedRangeState feedRangeState = monadicExtractState.Result; + public async ValueTask MoveNextAsync(ITrace trace) + { + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } - QueryPartitionRangePageAsyncEnumerator partitionPageEnumerator = new QueryPartitionRangePageAsyncEnumerator( - documentContainer, - sqlQuerySpec, - feedRangeState, - partitionKey, - queryPaginationOptions, - cancellationToken); + if (!await this.queryPartitionRangePageAsyncEnumerator.MoveNextAsync(trace)) + { + this.Current = default; + return false; + } - OptimisticDirectExecutionQueryPipelineStage stage = new OptimisticDirectExecutionQueryPipelineStage(partitionPageEnumerator); - return TryCatch.FromResult(stage); - } + TryCatch partitionPage = this.queryPartitionRangePageAsyncEnumerator.Current; + if (partitionPage.Failed) + { + this.Current = TryCatch.FromException(partitionPage.Exception); + return true; + } - private static TryCatch> MonadicExtractState( - CosmosElement continuationToken, - FeedRangeEpk range) - { - if (continuationToken == null) - { - throw new ArgumentNullException(nameof(continuationToken)); + QueryPage backendQueryPage = partitionPage.Result; + + QueryState queryState; + if (backendQueryPage.State == null) + { + queryState = null; + } + else + { + QueryState backendQueryState = backendQueryPage.State; + ParallelContinuationToken parallelContinuationToken = new ParallelContinuationToken( + token: (backendQueryState?.Value as CosmosString)?.Value, + range: ((FeedRangeEpk)this.queryPartitionRangePageAsyncEnumerator.FeedRangeState.FeedRange).Range); + + OptimisticDirectExecutionContinuationToken optimisticDirectExecutionContinuationToken = new OptimisticDirectExecutionContinuationToken(parallelContinuationToken); + CosmosElement cosmosElementContinuationToken = OptimisticDirectExecutionContinuationToken.ToCosmosElement(optimisticDirectExecutionContinuationToken); + queryState = new QueryState(cosmosElementContinuationToken); + } + + QueryPage queryPage = new QueryPage( + backendQueryPage.Documents, + backendQueryPage.RequestCharge, + backendQueryPage.ActivityId, + backendQueryPage.ResponseLengthInBytes, + backendQueryPage.CosmosQueryExecutionInfo, + disallowContinuationTokenMessage: null, + backendQueryPage.AdditionalHeaders, + queryState); + + this.Current = TryCatch.FromResult(queryPage); + return true; } - TryCatch tryCreateContinuationToken = OptimisticDirectExecutionContinuationToken.TryCreateFromCosmosElement(continuationToken); - if (tryCreateContinuationToken.Failed) + public static TryCatch MonadicCreate( + IDocumentContainer documentContainer, + SqlQuerySpec sqlQuerySpec, + FeedRangeEpk targetRange, + Cosmos.PartitionKey? partitionKey, + QueryPaginationOptions queryPaginationOptions, + CosmosElement continuationToken, + CancellationToken cancellationToken) { - return TryCatch>.FromException(tryCreateContinuationToken.Exception); - } + if (targetRange == null) + { + throw new ArgumentNullException(nameof(targetRange)); + } - TryCatch> partitionMappingMonad = PartitionMapper.MonadicGetPartitionMapping( - range, - tryCreateContinuationToken.Result); + TryCatch> monadicExtractState; + if (continuationToken == null) + { + FeedRangeState getState = new (targetRange, (QueryState)null); + monadicExtractState = TryCatch>.FromResult(getState); + } + else + { + monadicExtractState = MonadicExtractState(continuationToken, targetRange); + } - if (partitionMappingMonad.Failed) + if (monadicExtractState.Failed) + { + return TryCatch.FromException(monadicExtractState.Exception); + } + + FeedRangeState feedRangeState = monadicExtractState.Result; + + QueryPartitionRangePageAsyncEnumerator partitionPageEnumerator = new QueryPartitionRangePageAsyncEnumerator( + documentContainer, + sqlQuerySpec, + feedRangeState, + partitionKey, + queryPaginationOptions, + cancellationToken); + + OptimisticDirectExecutionQueryPipelineImpl stage = new OptimisticDirectExecutionQueryPipelineImpl(partitionPageEnumerator); + return TryCatch.FromResult(stage); + } + + private static TryCatch> MonadicExtractState( + CosmosElement continuationToken, + FeedRangeEpk range) { - return TryCatch>.FromException( - partitionMappingMonad.Exception); + if (continuationToken == null) + { + throw new ArgumentNullException(nameof(continuationToken)); + } + + TryCatch tryCreateContinuationToken = OptimisticDirectExecutionContinuationToken.TryCreateFromCosmosElement(continuationToken); + if (tryCreateContinuationToken.Failed) + { + return TryCatch>.FromException(tryCreateContinuationToken.Exception); + } + + TryCatch> partitionMappingMonad = PartitionMapper.MonadicGetPartitionMapping( + range, + tryCreateContinuationToken.Result); + + if (partitionMappingMonad.Failed) + { + return TryCatch>.FromException( + partitionMappingMonad.Exception); + } + + PartitionMapper.PartitionMapping partitionMapping = partitionMappingMonad.Result; + + KeyValuePair kvpRange = new KeyValuePair( + partitionMapping.TargetMapping.Keys.First(), + partitionMapping.TargetMapping.Values.First()); + + FeedRangeState feedRangeState = new FeedRangeState(kvpRange.Key, kvpRange.Value?.Token != null ? new QueryState(CosmosString.Create(kvpRange.Value.Token.Token)) : null); + + return TryCatch>.FromResult(feedRangeState); } - - PartitionMapping partitionMapping = partitionMappingMonad.Result; - - KeyValuePair kvpRange = new KeyValuePair( - partitionMapping.TargetMapping.Keys.First(), - partitionMapping.TargetMapping.Values.First()); - - FeedRangeState feedRangeState = new FeedRangeState(kvpRange.Key, kvpRange.Value?.Token != null ? new QueryState(CosmosString.Create(kvpRange.Value.Token.Token)) : null); - - return TryCatch>.FromResult(feedRangeState); } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.NegativeOptimisticDirectExecutionOutput.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.NegativeOptimisticDirectExecutionOutput.xml index b56c470469..7a8259eefc 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.NegativeOptimisticDirectExecutionOutput.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.NegativeOptimisticDirectExecutionOutput.xml @@ -38,4 +38,30 @@ false + + + Single Partition Key with Parallel continuation token + SELECT * FROM c + + /pk + + Hash + + + false + + + + + Single Partition Key with OrderBy continuation token + SELECT * FROM c ORDER BY c._ts + + /pk + + Hash + + + false + + \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.PositiveOptimisticDirectExecutionOutput.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.PositiveOptimisticDirectExecutionOutput.xml index 7d4f7ce18d..2482925bd8 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.PositiveOptimisticDirectExecutionOutput.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/OptimisticDirectExecutionQueryBaselineTests.PositiveOptimisticDirectExecutionOutput.xml @@ -38,4 +38,30 @@ true + + + Single Partition Key and Value Field + SELECT * FROM c + + /pk + + Hash + + + true + + + + + Single Partition Key and Ode continuation token + SELECT * FROM c + + /pk + + Hash + + + true + + \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs index f849a04913..6737a3d338 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs @@ -3,33 +3,31 @@ using System; using System.Collections.Generic; using System.Collections.ObjectModel; - using System.Xml; - using Microsoft.Azure.Documents; - using Microsoft.Azure.Documents.Routing; - using Microsoft.Azure.Cosmos.Query.Core; - using Microsoft.Azure.Cosmos.Test.BaselineTest; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using Newtonsoft.Json; - using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; + using System.IO; + using System.Linq; + using System.Net; + using System.Threading; using System.Threading.Tasks; + using System.Xml; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Pagination; - using Microsoft.Azure.Cosmos.Tests.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - using Microsoft.Azure.Cosmos.Query.Core.QueryClient; - using Moq; using Microsoft.Azure.Cosmos.Query; + using Microsoft.Azure.Cosmos.Query.Core; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; + using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.Pipeline; - using Microsoft.Azure.Cosmos.Routing; - using System.Threading; - using System.Linq; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using System.IO; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.OptimisticDirectExecutionQuery; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.OrderBy; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel; - using Microsoft.IdentityModel.Tokens; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; + using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; + using Microsoft.Azure.Cosmos.Test.BaselineTest; + using Microsoft.Azure.Cosmos.Tests.Pagination; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + using Microsoft.Azure.Documents.Routing; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; [TestClass] public class OptimisticDirectExecutionQueryBaselineTests : BaselineTests @@ -38,6 +36,10 @@ public class OptimisticDirectExecutionQueryBaselineTests : BaselineTests testVariations = new List { CreateInput( @@ -60,6 +62,22 @@ public void PositiveOptimisticDirectExecutionOutput() expectedOptimisticDirectExecution: true, partitionKeyPath: @"/pk", partitionKeyValue: @"value"), + + CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a", + continuationToken: null), + + CreateInput( + description: @"Single Partition Key and Ode continuation token", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a", + continuationToken: cosmosElementOdeContinuationToken), }; this.ExecuteTestSuite(testVariations); } @@ -68,6 +86,23 @@ public void PositiveOptimisticDirectExecutionOutput() [Owner("akotalwar")] public void NegativeOptimisticDirectExecutionOutput() { + ParallelContinuationToken parallelContinuationToken = new ParallelContinuationToken( + token: Guid.NewGuid().ToString(), + range: new Documents.Routing.Range("A", "B", true, false)); + + OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( + parallelContinuationToken, + new List() { new OrderByItem(CosmosObject.Create(new Dictionary() { { "item", CosmosString.Create("asdf") } })) }, + rid: "43223532", + skipCount: 42, + filter: "filter"); + + CosmosElement cosmosElementOrderByContinuationToken = CosmosArray.Create( + new List() + { + OrderByContinuationToken.ToCosmosElement(orderByContinuationToken) + }); + List testVariations = new List { CreateInput( @@ -90,58 +125,67 @@ public void NegativeOptimisticDirectExecutionOutput() expectedOptimisticDirectExecution: false, partitionKeyPath: @"/pk", partitionKeyValue: null), + + CreateInput( + description: @"Single Partition Key with Parallel continuation token", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: false, + partitionKeyPath: @"/pk", + partitionKeyValue: "a", + continuationToken: CosmosArray.Create(new List() { ParallelContinuationToken.ToCosmosElement(parallelContinuationToken) })), + + CreateInput( + description: @"Single Partition Key with OrderBy continuation token", + query: "SELECT * FROM c ORDER BY c._ts", + expectedOptimisticDirectExecution: false, + partitionKeyPath: @"/pk", + partitionKeyValue: "a", + continuationToken: cosmosElementOrderByContinuationToken), }; this.ExecuteTestSuite(testVariations); } - + // This test confirms that TestInjection.EnableOptimisticDirectExection is set to false from default. // Check test "TestPipelineForDistributedQueryAsync" to understand why this is done [TestMethod] - public async Task TestDefaultTestInjectionSettings() + public async Task TestDefaultTestInjectionSettingsAsync() { TestInjections testInjection = new TestInjections(simulate429s: false, simulateEmptyPages: false); Assert.AreEqual(testInjection.EnableOptimisticDirectExecution, false); } - [TestMethod] - [Owner("akotalwar")] - public async Task TestMonadicCreateOdePipeline() - { - int numItems = 10; - bool multiPartition = false; - string query = "SELECT * FROM c"; - - // null continuation token - Assert.IsTrue(await TryMonadicCreate(numItems, multiPartition, query, targetRange: FeedRangeEpk.FullRange, continuationToken: null)); - - CosmosElement cosmosElementContinuationToken = CosmosElement.Parse( - "{\"OptimisticDirectExecutionToken\":{\"token\":\"{\\\"resourceId\\\":\\\"AQAAAMmFOw8LAAAAAAAAAA==\\\",\\\"skipCount\\\":1}\"," + - "\"range\":{\"min\":\"\",\"max\":\"FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF\"}}}"); - Range range = new Documents.Routing.Range("", "FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF", isMinInclusive: true, isMaxInclusive: false); - - // single continuation token - Assert.IsTrue(await TryMonadicCreate(numItems, multiPartition, query, targetRange: new FeedRangeEpk(range), continuationToken: cosmosElementContinuationToken)); - - //TODO: Add non Ode continuation token case - } - - // test checks that the pipeline can take a query to the backend and returns its associated document(s). + // test checks that the pipeline can take a query to the backend and returns its associated document(s). [TestMethod] public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync() { int numItems = 100; - string query = "SELECT VALUE COUNT(1) FROM c"; + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT VALUE COUNT(1) FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a"); + + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); + DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: false); - IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null); + IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions); int documentCountInSinglePartition = 0; while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) { + Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + TryCatch tryGetPage = queryPipelineStage.Current; tryGetPage.ThrowIfFailed(); documentCountInSinglePartition += Int32.Parse(tryGetPage.Result.Documents[0].ToString()); + + if (tryGetPage.Result.State == null) + { + break; + } } Assert.AreEqual(100, documentCountInSinglePartition); @@ -152,10 +196,17 @@ public async Task TestPipelineForBackendDocumentsOnSinglePartitionAsync() public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync() { int numItems = 100; - int result = await this.CreateOptimisticPipelineAndDrainAsync( - numItems: numItems, - isMultiPartition: false, - query: "SELECT * FROM c", + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a"); + + int result = await this.GetPipelineAndDrainAsync( + input, + numItems: numItems, + isMultiPartition: false, expectedContinuationTokenCount: 10); Assert.AreEqual(numItems, result); @@ -165,9 +216,18 @@ public async Task TestPipelineForContinuationTokenOnSinglePartitionAsync() [TestMethod] public async Task TestPipelineForGoneExceptionOnSingleAndMultiplePartitionAsync() { - Assert.IsTrue(await ExecuteGoneExceptionOnOdePipeline(isMultiPartition: false)); + Assert.IsTrue(await ExecuteGoneExceptionOnODEPipeline(isMultiPartition: false)); + + Assert.IsTrue(await ExecuteGoneExceptionOnODEPipeline(isMultiPartition: true)); + } + + // test to check if failing fallback pipeline is handled properly + [TestMethod] + public async Task TestHandlingOfFailedFallbackPipelineOnSingleAndMultiplePartitionAsync() + { + Assert.IsTrue(await TestHandlingOfFailedFallbackPipeline(isMultiPartition: false)); - Assert.IsTrue(await ExecuteGoneExceptionOnOdePipeline(isMultiPartition: true)); + Assert.IsTrue(await TestHandlingOfFailedFallbackPipeline(isMultiPartition: true)); } // The reason we have the below test is to show the missing capabilities of the OptimisticDirectExecution pipeline. @@ -177,66 +237,126 @@ public async Task TestPipelineForGoneExceptionOnSingleAndMultiplePartitionAsync( public async Task TestPipelineForDistributedQueryAsync() { int numItems = 100; - int result = await this.CreateOptimisticPipelineAndDrainAsync( - numItems: numItems, - isMultiPartition: false, - query: "SELECT AVG(c) FROM c", + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT AVG(c) FROM c", + expectedOptimisticDirectExecution: false, + partitionKeyPath: @"/pk", + partitionKeyValue: "a"); + + int result = await this.GetPipelineAndDrainAsync( + input, + numItems: numItems, + isMultiPartition: false, expectedContinuationTokenCount: 0); //TODO: Add validation for actual value of average Assert.AreEqual(1, result); } - private static async Task TryMonadicCreate(int numItems, bool multiPartition, string query, FeedRangeEpk targetRange, CosmosElement continuationToken) + // Creates a gone exception after the first MoveNexyAsync() call. This allows for the pipeline to return some documents before failing + private static async Task ExecuteGoneExceptionOnODEPipeline(bool isMultiPartition) + { + int numItems = 100; + List documents = new List(); + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); + (MergeTestUtil mergeTest, IQueryPipelineStage queryPipelineStage) = await CreateFallbackPipelineTestInfrastructure(numItems, isFailedFallbackPipelineTest: false, isMultiPartition, queryRequestOptions); + + while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) + { + if (mergeTest.MoveNextCounter == 1) + { + Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + } + else + { + Assert.AreNotEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + } + + TryCatch tryGetPage = queryPipelineStage.Current; + + if (tryGetPage.Failed) + { + // failure should never come till here. Should be handled before + Assert.Fail("Unexpected error. Gone Exception should not reach till here"); + } + + documents.AddRange(tryGetPage.Result.Documents); + } + + Assert.AreEqual(numItems, documents.Count); + return true; + } + + private static async Task TestHandlingOfFailedFallbackPipeline(bool isMultiPartition) { - DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition); + int numItems = 100; + List documents = new List(); + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); + (MergeTestUtil mergeTest, IQueryPipelineStage queryPipelineStage) = await CreateFallbackPipelineTestInfrastructure(numItems, isFailedFallbackPipelineTest: true, isMultiPartition, queryRequestOptions); + + while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) + { + TryCatch tryGetPage = queryPipelineStage.Current; + if (tryGetPage.Failed) + { + if (mergeTest.MoveNextCounter == 3) + { + Assert.IsTrue(tryGetPage.InnerMostException.Message.Equals("Injected failure")); + Assert.AreNotEqual(numItems, documents.Count); + return true; + } + else + { + Assert.Fail("Fallback pipeline failure not handled correctly"); + return false; + } + } - TryCatch monadicQueryPipelineStage = OptimisticDirectExecutionQueryPipelineStage.MonadicCreate( - documentContainer: inMemoryCollection, - sqlQuerySpec: new SqlQuerySpec(query), - targetRange: targetRange, - queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - partitionKey: null, - cancellationToken: default, - continuationToken: continuationToken); + documents.AddRange(tryGetPage.Result.Documents); + } - return monadicQueryPipelineStage.Succeeded; + return false; } - private static async Task CreateOptimisticDirectExecutionPipelineStateAsync(DocumentContainer documentContainer, string query, CosmosElement continuationToken) + private static async Task<(MergeTestUtil, IQueryPipelineStage)> CreateFallbackPipelineTestInfrastructure(int numItems, bool isFailedFallbackPipelineTest, bool isMultiPartition, QueryRequestOptions queryRequestOptions) { - List targetRanges = await documentContainer.GetFeedRangesAsync( - trace: NoOpTrace.Singleton, - cancellationToken: default); + List documents = new List(); + MergeTestUtil mergeTest = new MergeTestUtil(isFailedFallbackPipelineTest); - // only one range is taken because Ode pipeline can only accept one range - FeedRangeEpk firstRange = targetRanges[0]; + OptimisticDirectExecutionTestInput input = CreateInput( + description: @"Single Partition Key and Value Field", + query: "SELECT * FROM c", + expectedOptimisticDirectExecution: true, + partitionKeyPath: @"/pk", + partitionKeyValue: "a"); - TryCatch monadicQueryPipelineStage = OptimisticDirectExecutionQueryPipelineStage.MonadicCreate( - documentContainer: documentContainer, - sqlQuerySpec: new SqlQuerySpec(query), - targetRange: firstRange, - queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), - partitionKey: null, - continuationToken: continuationToken, - cancellationToken: default); + DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync( + numItems, + multiPartition: isMultiPartition, + failureConfigs: new FlakyDocumentContainer.FailureConfigs( + inject429s: false, + injectEmptyPages: false, + shouldReturnFailure: mergeTest.ShouldReturnFailure)); - Assert.IsTrue(monadicQueryPipelineStage.Succeeded); - IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result; + IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions); - return queryPipelineStage; + return (mergeTest, queryPipelineStage); } - private async Task CreateOptimisticPipelineAndDrainAsync(int numItems, bool isMultiPartition, string query, int expectedContinuationTokenCount) + private async Task GetPipelineAndDrainAsync(OptimisticDirectExecutionTestInput input, int numItems, bool isMultiPartition, int expectedContinuationTokenCount) { + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync(numItems, multiPartition: isMultiPartition); - IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null); - + IQueryPipelineStage queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions); + List documents = new List(); int continuationTokenCount = 0; while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) { + Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + TryCatch tryGetPage = queryPipelineStage.Current; tryGetPage.ThrowIfFailed(); @@ -248,7 +368,15 @@ private async Task CreateOptimisticPipelineAndDrainAsync(int numItems, bool } else { - queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: tryGetPage.Result.State.Value); + input = CreateInput( + description: input.Description, + query: input.Query, + expectedOptimisticDirectExecution: input.ExpectedOptimisticDirectExecution, + partitionKeyPath: @"/pk", + partitionKeyValue: input.PartitionKeyValue, + continuationToken: tryGetPage.Result.State.Value); + + queryPipelineStage = await GetOdePipelineAsync(input, inMemoryCollection, queryRequestOptions); } continuationTokenCount++; @@ -258,57 +386,34 @@ private async Task CreateOptimisticPipelineAndDrainAsync(int numItems, bool return documents.Count; } - // it creates a gone exception after the first MoveNexyAsync() call. This allows for the pipeline to return some documents before failing - // TODO: With the addition of the merge/split support, this queryPipelineStage should be able to return all documents regardless of a gone exception happening - private static async Task ExecuteGoneExceptionOnOdePipeline(bool isMultiPartition) + internal static PartitionedQueryExecutionInfo GetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition) { - int numItems = 100; - string query = "SELECT * FROM c"; - List documents = new List(); - string goneExceptionMessage = $"Epk Range: Partition does not exist at the given range."; - CosmosException goneException = new CosmosException( - message: goneExceptionMessage, - statusCode: System.Net.HttpStatusCode.Gone, - subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone, - activityId: "0f8fad5b-d9cb-469f-a165-70867728950e", - requestCharge: default); - - int moveNextAsyncCounter = 0; - bool caughtGoneException = false; - DocumentContainer inMemoryCollection = await CreateDocumentContainerAsync( - numItems, - multiPartition: isMultiPartition, - failureConfigs: new FlakyDocumentContainer.FailureConfigs( - inject429s: false, - injectEmptyPages: false, - shouldReturnFailure: () => Task.FromResult(moveNextAsyncCounter == 1 ? goneException : null))); - - IQueryPipelineStage queryPipelineStage = await CreateOptimisticDirectExecutionPipelineStateAsync(inMemoryCollection, query, continuationToken: null); - while (await queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton)) - { - moveNextAsyncCounter++; - TryCatch tryGetPage = queryPipelineStage.Current; - - if (tryGetPage.Failed == true) - { - string errorRecieved = tryGetPage.Exception.InnerException.Message; - Assert.AreEqual(goneException.GetType(), tryGetPage.Exception.InnerException.GetType()); + TryCatch tryGetQueryPlan = QueryPartitionProviderTestInstance.Object.TryGetPartitionedQueryExecutionInfo( + querySpecJsonString: querySpecJsonString, + partitionKeyDefinition: pkDefinition, + requireFormattableOrderByQuery: true, + isContinuationExpected: true, + allowNonValueAggregateQuery: true, + hasLogicalPartitionKey: false, + allowDCount: true, + useSystemPrefix: false, + geospatialType: Cosmos.GeospatialType.Geography); - if (errorRecieved.Equals(goneExceptionMessage)) - { - caughtGoneException = true; - break; - } - } + return tryGetQueryPlan.Result; + } - documents.AddRange(tryGetPage.Result.Documents); - } + private static async Task GetOdePipelineAsync(OptimisticDirectExecutionTestInput input, DocumentContainer documentContainer, QueryRequestOptions queryRequestOptions) + { + (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions); - // Once fallback plan is implemented, this test should be able to return all 100 documents - Assert.AreEqual(10, documents.Count); - Assert.IsTrue(caughtGoneException); + IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( + documentContainer, + cosmosQueryContextCore, + inputParameters, + NoOpTrace.Singleton); - return true; + Assert.IsNotNull(queryPipelineStage); + return queryPipelineStage; } private static async Task CreateDocumentContainerAsync( @@ -335,7 +440,7 @@ private static async Task CreateDocumentContainerAsync( DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer); // a value of 2 would lead to 4 partitions (2 * 2). 4 partitions are used because they're easy to manage + demonstrates multi partition use case - int exponentPartitionKeyRanges = 2; + int exponentPartitionKeyRanges = 2; IReadOnlyList ranges; @@ -359,7 +464,7 @@ private static async Task CreateDocumentContainerAsync( ranges = await documentContainer.GetFeedRangesAsync( trace: NoOpTrace.Singleton, cancellationToken: default); - + int rangeCount = multiPartition ? 4 : 1; Assert.AreEqual(rangeCount, ranges.Count); @@ -367,7 +472,7 @@ private static async Task CreateDocumentContainerAsync( for (int i = 0; i < numItems; i++) { // Insert an item - CosmosObject item = CosmosObject.Parse($"{{\"pk\" : {i} }}"); + CosmosObject item = CosmosObject.Parse($"{{\"pk\" : \"a\" }}"); TryCatch monadicCreateRecord = await documentContainer.MonadicCreateItemAsync(item, cancellationToken: default); Assert.IsTrue(monadicCreateRecord.Succeeded); } @@ -400,52 +505,43 @@ private static OptimisticDirectExecutionTestInput CreateInput( return new OptimisticDirectExecutionTestInput(description, query, new SqlQuerySpec(query), expectedOptimisticDirectExecution, partitionKeyPath, partitionKeyValue, continuationToken); } - private static PartitionedQueryExecutionInfo GetPartitionedQueryExecutionInfo(string querySpecJsonString, PartitionKeyDefinition pkDefinition) - { - TryCatch tryGetQueryPlan = QueryPartitionProviderTestInstance.Object.TryGetPartitionedQueryExecutionInfo( - querySpecJsonString: querySpecJsonString, - partitionKeyDefinition: pkDefinition, - requireFormattableOrderByQuery: true, - isContinuationExpected: true, - allowNonValueAggregateQuery: true, - hasLogicalPartitionKey: false, - allowDCount: true, - useSystemPrefix: false, - geospatialType: Cosmos.GeospatialType.Geography); - - return tryGetQueryPlan.Result; - } - public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirectExecutionTestInput input) { // gets DocumentContainer IMonadicDocumentContainer monadicDocumentContainer = new InMemoryContainer(input.PartitionKeyDefinition); DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer); - SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(input.Query); + QueryRequestOptions queryRequestOptions = GetQueryRequestOptions(enableOptimisticDirectExecution: true); - // gets query context - string databaseId = "db1234"; - string resourceLink = $"dbs/{databaseId}/colls"; - CosmosQueryContextCore cosmosQueryContextCore = new CosmosQueryContextCore( - client: new TestCosmosQueryClient(), - resourceTypeEnum: Documents.ResourceType.Document, - operationType: Documents.OperationType.Query, - resourceType: typeof(QueryResponseCore), - resourceLink: resourceLink, - isContinuationExpected: true, - allowNonValueAggregateQuery: true, - useSystemPrefix: false, - correlatedActivityId: Guid.NewGuid()); + (CosmosQueryExecutionContextFactory.InputParameters inputParameters, CosmosQueryContextCore cosmosQueryContextCore) = CreateInputParamsAndQueryContext(input, queryRequestOptions); - // gets input parameters - QueryRequestOptions queryRequestOptions = new QueryRequestOptions + IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( + documentContainer, + cosmosQueryContextCore, + inputParameters, + NoOpTrace.Singleton); + + bool result = queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton).AsTask().GetAwaiter().GetResult(); + + if (input.ExpectedOptimisticDirectExecution) { - TestSettings = new TestInjections(simulate429s: true, simulateEmptyPages: false, enableOptimisticDirectExecution: true, new TestInjections.ResponseStats()) - }; + Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + } + else + { + Assert.AreNotEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + } + + Assert.IsNotNull(queryPipelineStage); + Assert.IsTrue(result); + return new OptimisticDirectExecutionTestOutput(input.ExpectedOptimisticDirectExecution); + } + + private static Tuple CreateInputParamsAndQueryContext(OptimisticDirectExecutionTestInput input, QueryRequestOptions queryRequestOptions) + { CosmosSerializerCore serializerCore = new(); - using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document)); + using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(new SqlQuerySpec(input.Query), Documents.ResourceType.Document)); string sqlQuerySpecJsonString = streamReader.ReadToEnd(); PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = GetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, input.PartitionKeyDefinition); @@ -455,7 +551,7 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect } CosmosQueryExecutionContextFactory.InputParameters inputParameters = new CosmosQueryExecutionContextFactory.InputParameters( - sqlQuerySpec: sqlQuerySpec, + sqlQuerySpec: new SqlQuerySpec(input.Query), initialUserContinuationToken: input.ContinuationToken, initialFeedRange: null, maxConcurrency: queryRequestOptions.MaxConcurrency, @@ -466,30 +562,81 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect partitionedQueryExecutionInfo: partitionedQueryExecutionInfo, executionEnvironment: null, returnResultsInDeterministicOrder: null, - forcePassthrough: true, + forcePassthrough: false, testInjections: queryRequestOptions.TestSettings); - IQueryPipelineStage queryPipelineStage = CosmosQueryExecutionContextFactory.Create( - documentContainer, - cosmosQueryContextCore, - inputParameters, - NoOpTrace.Singleton); + string databaseId = "db1234"; + string resourceLink = $"dbs/{databaseId}/colls"; + CosmosQueryContextCore cosmosQueryContextCore = new CosmosQueryContextCore( + client: new TestCosmosQueryClient(), + resourceTypeEnum: Documents.ResourceType.Document, + operationType: Documents.OperationType.Query, + resourceType: typeof(QueryResponseCore), + resourceLink: resourceLink, + isContinuationExpected: true, + allowNonValueAggregateQuery: true, + useSystemPrefix: false, + correlatedActivityId: Guid.NewGuid()); - bool result = queryPipelineStage.MoveNextAsync(NoOpTrace.Singleton).Result; + return Tuple.Create(inputParameters, cosmosQueryContextCore); + } - if (input.ExpectedOptimisticDirectExecution) + private static QueryRequestOptions GetQueryRequestOptions(bool enableOptimisticDirectExecution) + { + return new QueryRequestOptions { - Assert.AreEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + MaxConcurrency = 0, + MaxItemCount = 10, + TestSettings = new TestInjections(simulate429s: true, simulateEmptyPages: false, enableOptimisticDirectExecution: enableOptimisticDirectExecution, new TestInjections.ResponseStats()), + Properties = new Dictionary() + { + { HttpConstants.HttpHeaders.EnumerationDirection, ""}, } - else + }; + } + + private class MergeTestUtil + { + public int MoveNextCounter { get; private set; } + + public bool GoneExceptionCreated { get; private set; } + + public bool TooManyRequestsFailureCreated { get; private set; } + + public bool IsFailedFallbackPipelineTest { get; } + + public MergeTestUtil(bool isFailedFallbackPipelineTest) { - Assert.AreNotEqual(TestInjections.PipelineType.OptimisticDirectExecution, queryRequestOptions.TestSettings.Stats.PipelineType.Value); + this.IsFailedFallbackPipelineTest = isFailedFallbackPipelineTest; } - Assert.IsNotNull(queryPipelineStage); - Assert.IsTrue(result); + public async Task ShouldReturnFailure() + { + this.MoveNextCounter++; + if (this.MoveNextCounter == 2 && !this.GoneExceptionCreated) + { + this.GoneExceptionCreated = true; + return new CosmosException( + message: $"Epk Range: Partition does not exist at the given range.", + statusCode: System.Net.HttpStatusCode.Gone, + subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone, + activityId: "0f8fad5b-d9cb-469f-a165-70867728950e", + requestCharge: default); + } + + if (this.IsFailedFallbackPipelineTest && this.GoneExceptionCreated && !this.TooManyRequestsFailureCreated) + { + this.TooManyRequestsFailureCreated = true; + return new CosmosException( + message: "Injected failure", + statusCode: HttpStatusCode.TooManyRequests, + subStatusCode: 3200, + activityId: "111fad5b-d9cb-469f-a165-70867728950e", + requestCharge: 0); + } - return new OptimisticDirectExecutionTestOutput(input.ExpectedOptimisticDirectExecution); + return null; + } } } @@ -578,14 +725,14 @@ public override void SerializeAsXml(XmlWriter xmlWriter) } } } - + internal class TestCosmosQueryClient : CosmosQueryClient { public override Action OnExecuteScalarQueryCallback => throw new NotImplementedException(); public override bool ByPassQueryParsing() { - throw new NotImplementedException(); + return false; } public override void ClearSessionTokenCache(string collectionFullName) @@ -610,7 +757,11 @@ public override Task ForceRefreshCollectionCacheAsync(string collectionLink, Can public override Task GetCachedContainerQueryPropertiesAsync(string containerLink, Cosmos.PartitionKey? partitionKey, ITrace trace, CancellationToken cancellationToken) { - return Task.FromResult(new ContainerQueryProperties()); + return Task.FromResult(new ContainerQueryProperties( + "test", + WFConstants.BackendHeaders.EffectivePartitionKeyString, + new PartitionKeyDefinition(), + Cosmos.GeospatialType.Geometry)); } public override Task> GetTargetPartitionKeyRangeByFeedRangeAsync(string resourceLink, string collectionResourceId, PartitionKeyDefinition partitionKeyDefinition, FeedRangeInternal feedRangeInternal, bool forceRefresh, ITrace trace) @@ -630,7 +781,12 @@ public override Task> GetTargetPartitionKeyRangesAsync(s public override Task> GetTargetPartitionKeyRangesByEpkStringAsync(string resourceLink, string collectionResourceId, string effectivePartitionKeyString, bool forceRefresh, ITrace trace) { - throw new NotImplementedException(); + return Task.FromResult(new List{new PartitionKeyRange() + { + MinInclusive = PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, + MaxExclusive = PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey + } + }); } public override Task> TryGetOverlappingRangesAsync(string collectionResourceId, Range range, bool forceRefresh = false) @@ -638,9 +794,14 @@ public override Task> TryGetOverlappingRangesAs throw new NotImplementedException(); } - public override Task> TryGetPartitionedQueryExecutionInfoAsync(SqlQuerySpec sqlQuerySpec, ResourceType resourceType, PartitionKeyDefinition partitionKeyDefinition, bool requireFormattableOrderByQuery, bool isContinuationExpected, bool allowNonValueAggregateQuery, bool hasLogicalPartitionKey, bool allowDCount, bool useSystemPrefix, Cosmos.GeospatialType geospatialType, CancellationToken cancellationToken) - { - throw new NotImplementedException(); + public override async Task> TryGetPartitionedQueryExecutionInfoAsync(SqlQuerySpec sqlQuerySpec, ResourceType resourceType, PartitionKeyDefinition partitionKeyDefinition, bool requireFormattableOrderByQuery, bool isContinuationExpected, bool allowNonValueAggregateQuery, bool hasLogicalPartitionKey, bool allowDCount, bool useSystemPrefix, Cosmos.GeospatialType geospatialType, CancellationToken cancellationToken) + { + CosmosSerializerCore serializerCore = new(); + using StreamReader streamReader = new(serializerCore.ToStreamSqlQuerySpec(sqlQuerySpec, Documents.ResourceType.Document)); + string sqlQuerySpecJsonString = streamReader.ReadToEnd(); + + PartitionedQueryExecutionInfo partitionedQueryExecutionInfo = OptimisticDirectExecutionQueryBaselineTests.GetPartitionedQueryExecutionInfo(sqlQuerySpecJsonString, partitionKeyDefinition); + return TryCatch.FromResult(partitionedQueryExecutionInfo); } } }