From 3b657ea26af71a4ab8bdefdb7b54419806a89788 Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Tue, 25 Jun 2024 10:59:36 -0700 Subject: [PATCH 1/3] remove ExecutionEnvironment and the Compute flavor of the query pipeline stages --- .../AggregateQueryPipelineStage.Client.cs | 2 +- .../AggregateQueryPipelineStage.Compute.cs | 254 ------------------ .../Aggregate/AggregateQueryPipelineStage.cs | 19 +- .../CosmosQueryExecutionContextFactory.cs | 7 - .../DCount/DCountQueryPipelineStage.Client.cs | 2 +- .../DCountQueryPipelineStage.Compute.cs | 244 ----------------- .../DCount/DCountQueryPipelineStage.cs | 35 ++- .../DistinctQueryPipelineStage.Client.cs | 2 +- .../DistinctQueryPipelineStage.Compute.cs | 206 -------------- .../Distinct/DistinctQueryPipelineStage.cs | 20 +- .../Core/Pipeline/ExecutionEnvironment.cs | 22 -- .../GroupByQueryPipelineStage.Client.cs | 2 +- .../GroupByQueryPipelineStage.Compute.cs | 253 ----------------- .../GroupBy/GroupByQueryPipelineStage.cs | 32 +-- .../Query/Core/Pipeline/PipelineFactory.cs | 8 - .../Skip/SkipQueryPipelineStage.Client.cs | 2 +- .../Skip/SkipQueryPipelineStage.Compute.cs | 222 --------------- .../Pipeline/Skip/SkipQueryPipelineStage.cs | 13 +- .../Take/TakeQueryPipelineStage.Compute.cs | 224 --------------- 19 files changed, 46 insertions(+), 1523 deletions(-) delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Compute.cs delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Compute.cs delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Compute.cs delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/ExecutionEnvironment.cs delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Compute.cs delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Compute.cs delete mode 100644 Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Compute.cs diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs index 7966918fcc..4d2c095eda 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Client.cs @@ -28,7 +28,7 @@ private ClientAggregateQueryPipelineStage( // all the work is done in the base constructor. } - public static TryCatch MonadicCreate( + public static new TryCatch MonadicCreate( IReadOnlyList aggregates, IReadOnlyDictionary aliasToAggregateType, IReadOnlyList orderedAliases, diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Compute.cs deleted file mode 100644 index fb938b530d..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.Compute.cs +++ /dev/null @@ -1,254 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class AggregateQueryPipelineStage : QueryPipelineStageBase - { - private static readonly IReadOnlyList EmptyResults = new List().AsReadOnly(); - - private sealed class ComputeAggregateQueryPipelineStage : AggregateQueryPipelineStage - { - private static readonly CosmosString DoneSourceToken = CosmosString.Create("DONE"); - - private ComputeAggregateQueryPipelineStage( - IQueryPipelineStage source, - SingleGroupAggregator singleGroupAggregator, - bool isValueAggregateQuery) - : base(source, singleGroupAggregator, isValueAggregateQuery) - { - // all the work is done in the base constructor. - } - - public static TryCatch MonadicCreate( - IReadOnlyList aggregates, - IReadOnlyDictionary aliasToAggregateType, - IReadOnlyList orderedAliases, - bool hasSelectValue, - CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) - { - AggregateContinuationToken aggregateContinuationToken; - if (continuationToken != null) - { - if (!AggregateContinuationToken.TryCreateFromCosmosElement( - continuationToken, - out aggregateContinuationToken)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Malfomed {nameof(AggregateContinuationToken)}: '{continuationToken}'")); - } - } - else - { - aggregateContinuationToken = new AggregateContinuationToken(singleGroupAggregatorContinuationToken: null, sourceContinuationToken: null); - } - - TryCatch tryCreateSingleGroupAggregator = SingleGroupAggregator.TryCreate( - aggregates, - aliasToAggregateType, - orderedAliases, - hasSelectValue, - aggregateContinuationToken.SingleGroupAggregatorContinuationToken); - if (tryCreateSingleGroupAggregator.Failed) - { - return TryCatch.FromException(tryCreateSingleGroupAggregator.Exception); - } - - TryCatch tryCreateSource; - if (aggregateContinuationToken.SourceContinuationToken is CosmosString stringToken && (stringToken.Value == DoneSourceToken.Value)) - { - tryCreateSource = TryCatch.FromResult(EmptyQueryPipelineStage.Singleton); - } - else - { - tryCreateSource = monadicCreatePipelineStage(aggregateContinuationToken.SourceContinuationToken); - } - - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - ComputeAggregateQueryPipelineStage stage = new ComputeAggregateQueryPipelineStage( - tryCreateSource.Result, - tryCreateSingleGroupAggregator.Result, - hasSelectValue); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.returnedFinalPage) - { - this.Current = default; - return false; - } - - // Draining aggregates is broken down into two stages - QueryPage queryPage; - if (await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - // Stage 1: - // Drain the aggregates fully from all continuations and all partitions - // And return empty pages in the meantime. - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - foreach (CosmosElement element in sourcePage.Documents) - { - cancellationToken.ThrowIfCancellationRequested(); - - RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections( - this.isValueQuery, - element); - this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload); - } - - AggregateContinuationToken aggregateContinuationToken = new AggregateContinuationToken( - singleGroupAggregatorContinuationToken: this.singleGroupAggregator.GetCosmosElementContinuationToken(), - sourceContinuationToken: sourcePage.State != null ? sourcePage.State.Value : DoneSourceToken); - QueryState queryState = new QueryState(AggregateContinuationToken.ToCosmosElement(aggregateContinuationToken)); - QueryPage emptyPage = new QueryPage( - documents: EmptyResults, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: queryState, - streaming: sourcePage.Streaming); - - queryPage = emptyPage; - } - else - { - // Stage 2: - // Return the final page after draining. - List finalResult = new List(); - CosmosElement aggregationResult = this.singleGroupAggregator.GetResult(); - if (aggregationResult != null) - { - finalResult.Add(aggregationResult); - } - - QueryPage finalPage = new QueryPage( - documents: finalResult, - requestCharge: default, - activityId: default, - cosmosQueryExecutionInfo: default, - distributionPlanSpec: default, - disallowContinuationTokenMessage: default, - additionalHeaders: default, - state: default, - streaming: default); - - queryPage = finalPage; - this.returnedFinalPage = true; - } - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - private sealed class AggregateContinuationToken - { - private const string SourceTokenName = "SourceToken"; - private const string AggregationTokenName = "AggregationToken"; - - public AggregateContinuationToken( - CosmosElement singleGroupAggregatorContinuationToken, - CosmosElement sourceContinuationToken) - { - this.SingleGroupAggregatorContinuationToken = singleGroupAggregatorContinuationToken; - this.SourceContinuationToken = sourceContinuationToken; - } - - public CosmosElement SingleGroupAggregatorContinuationToken { get; } - - public CosmosElement SourceContinuationToken { get; } - - public static CosmosElement ToCosmosElement(AggregateContinuationToken aggregateContinuationToken) - { - Dictionary dictionary = new Dictionary() - { - { - AggregateContinuationToken.SourceTokenName, - aggregateContinuationToken.SourceContinuationToken - }, - { - AggregateContinuationToken.AggregationTokenName, - aggregateContinuationToken.SingleGroupAggregatorContinuationToken - } - }; - - return CosmosObject.Create(dictionary); - } - - public static bool TryCreateFromCosmosElement( - CosmosElement continuationToken, - out AggregateContinuationToken aggregateContinuationToken) - { - if (continuationToken == null) - { - throw new ArgumentNullException(nameof(continuationToken)); - } - - if (!(continuationToken is CosmosObject rawAggregateContinuationToken)) - { - aggregateContinuationToken = default; - return false; - } - - if (!rawAggregateContinuationToken.TryGetValue( - AggregateContinuationToken.AggregationTokenName, - out CosmosElement singleGroupAggregatorContinuationToken)) - { - aggregateContinuationToken = default; - return false; - } - - if (!rawAggregateContinuationToken.TryGetValue( - AggregateContinuationToken.SourceTokenName, - out CosmosElement sourceContinuationToken)) - { - aggregateContinuationToken = default; - return false; - } - - aggregateContinuationToken = new AggregateContinuationToken( - singleGroupAggregatorContinuationToken: singleGroupAggregatorContinuationToken, - sourceContinuationToken: sourceContinuationToken); - return true; - } - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs index cf681d0d4d..57eac41990 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Aggregate/AggregateQueryPipelineStage.cs @@ -58,30 +58,21 @@ public AggregateQueryPipelineStage( } public static TryCatch MonadicCreate( - ExecutionEnvironment executionEnvironment, IReadOnlyList aggregates, IReadOnlyDictionary aliasToAggregateType, IReadOnlyList orderedAliases, bool hasSelectValue, CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) => executionEnvironment switch - { - ExecutionEnvironment.Client => ClientAggregateQueryPipelineStage.MonadicCreate( - aggregates, - aliasToAggregateType, - orderedAliases, - hasSelectValue, - continuationToken, - monadicCreatePipelineStage), - ExecutionEnvironment.Compute => ComputeAggregateQueryPipelineStage.MonadicCreate( + MonadicCreatePipelineStage monadicCreatePipelineStage) + { + return ClientAggregateQueryPipelineStage.MonadicCreate( aggregates, aliasToAggregateType, orderedAliases, hasSelectValue, continuationToken, - monadicCreatePipelineStage), - _ => throw new ArgumentException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}."), - }; + monadicCreatePipelineStage); + } /// /// Struct for getting the payload out of the rewritten projection. diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs index ceff8377b4..6d512d89f0 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs @@ -407,7 +407,6 @@ private static TryCatch TryCreateSpecializedDocumentQueryEx inputParameters.PartitionKey, inputParameters.Properties, inputParameters.PartitionedQueryExecutionInfo, - inputParameters.ExecutionEnvironment, inputParameters.ReturnResultsInDeterministicOrder, inputParameters.EnableOptimisticDirectExecution, inputParameters.IsNonStreamingOrderByQueryFeatureDisabled, @@ -559,7 +558,6 @@ private static TryCatch TryCreateSpecializedDocumentQueryEx $"Invalid MaxItemCount {optimalPageSize}"); return PipelineFactory.MonadicCreate( - executionEnvironment: inputParameters.ExecutionEnvironment, documentContainer: documentContainer, sqlQuerySpec: inputParameters.SqlQuerySpec, targetRanges: targetRanges @@ -831,7 +829,6 @@ public sealed class InputParameters private const int DefaultMaxItemCount = 1000; private const int DefaultMaxBufferedItemCount = 1000; private const bool DefaultReturnResultsInDeterministicOrder = true; - private const ExecutionEnvironment DefaultExecutionEnvironment = ExecutionEnvironment.Client; public InputParameters( SqlQuerySpec sqlQuerySpec, @@ -843,7 +840,6 @@ public InputParameters( PartitionKey? partitionKey, IReadOnlyDictionary properties, PartitionedQueryExecutionInfo partitionedQueryExecutionInfo, - ExecutionEnvironment? executionEnvironment, bool? returnResultsInDeterministicOrder, bool enableOptimisticDirectExecution, bool isNonStreamingOrderByQueryFeatureDisabled, @@ -877,7 +873,6 @@ public InputParameters( this.PartitionKey = partitionKey; this.Properties = properties; this.PartitionedQueryExecutionInfo = partitionedQueryExecutionInfo; - this.ExecutionEnvironment = executionEnvironment.GetValueOrDefault(InputParameters.DefaultExecutionEnvironment); this.ReturnResultsInDeterministicOrder = returnResultsInDeterministicOrder.GetValueOrDefault(InputParameters.DefaultReturnResultsInDeterministicOrder); this.EnableOptimisticDirectExecution = enableOptimisticDirectExecution; this.IsNonStreamingOrderByQueryFeatureDisabled = isNonStreamingOrderByQueryFeatureDisabled; @@ -893,7 +888,6 @@ public InputParameters( public PartitionKey? PartitionKey { get; } public IReadOnlyDictionary Properties { get; } public PartitionedQueryExecutionInfo PartitionedQueryExecutionInfo { get; } - public ExecutionEnvironment ExecutionEnvironment { get; } public bool ReturnResultsInDeterministicOrder { get; } public TestInjections TestInjections { get; } public bool EnableOptimisticDirectExecution { get; } @@ -911,7 +905,6 @@ public InputParameters WithContinuationToken(CosmosElement token) this.PartitionKey, this.Properties, this.PartitionedQueryExecutionInfo, - this.ExecutionEnvironment, this.ReturnResultsInDeterministicOrder, this.EnableOptimisticDirectExecution, this.IsNonStreamingOrderByQueryFeatureDisabled, diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs index 0e09e2c159..afdd35c0b4 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Client.cs @@ -31,7 +31,7 @@ private ClientDCountQueryPipelineStage( // all the work is done in the base constructor. } - public static TryCatch MonadicCreate( + public static new TryCatch MonadicCreate( DCountInfo info, CosmosElement continuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Compute.cs deleted file mode 100644 index 84f8cfff64..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.Compute.cs +++ /dev/null @@ -1,244 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.DCount -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.CosmosElements.Numbers; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate.Aggregators; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class DCountQueryPipelineStage : QueryPipelineStageBase - { - private static readonly IReadOnlyList EmptyResults = new List().AsReadOnly(); - - private sealed class ComputeDCountQueryPipelineStage : DCountQueryPipelineStage - { - private static readonly CosmosString DoneSourceToken = CosmosString.Create("DONE"); - - private ComputeDCountQueryPipelineStage( - IQueryPipelineStage source, - long count, - DCountInfo info) - : base(source, count, info) - { - // all the work is done in the base constructor. - } - - public static TryCatch MonadicCreate( - DCountInfo info, - CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) - { - DCountContinuationToken dcountContinuationToken; - if (continuationToken != null) - { - if (!DCountContinuationToken.TryCreateFromCosmosElement( - continuationToken, - out dcountContinuationToken)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Malfomed {nameof(DCountContinuationToken)}: '{continuationToken}'")); - } - } - else - { - dcountContinuationToken = new DCountContinuationToken(count: 0, sourceContinuationToken: null); - } - - TryCatch tryCreateSource; - if (dcountContinuationToken.SourceContinuationToken is CosmosString stringToken && (stringToken.Value == DoneSourceToken.Value)) - { - tryCreateSource = TryCatch.FromResult(EmptyQueryPipelineStage.Singleton); - } - else - { - tryCreateSource = monadicCreatePipelineStage(dcountContinuationToken.SourceContinuationToken); - } - - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - ComputeDCountQueryPipelineStage stage = new ComputeDCountQueryPipelineStage( - tryCreateSource.Result, - dcountContinuationToken.Count, - info); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.returnedFinalPage) - { - this.Current = default; - return false; - } - - // Draining aggregates is broken down into two stages - QueryPage queryPage; - if (await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - // Stage 1: - // Drain the aggregates fully from all continuations and all partitions - // And return empty pages in the meantime. - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - cancellationToken.ThrowIfCancellationRequested(); - this.count += sourcePage.Documents.Count; - - DCountContinuationToken dcountContinuationToken = new DCountContinuationToken( - count: this.count, - sourceContinuationToken: sourcePage.State != null ? sourcePage.State.Value : DoneSourceToken); - QueryState queryState = new QueryState(DCountContinuationToken.ToCosmosElement(dcountContinuationToken)); - QueryPage emptyPage = new QueryPage( - documents: EmptyResults, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: queryState, - streaming: sourcePage.Streaming); - - queryPage = emptyPage; - } - else - { - // Stage 2: - // Return the final page after draining. - List finalResult = new List(); - CosmosElement aggregationResult = this.GetFinalResult(); - if (aggregationResult != null) - { - finalResult.Add(aggregationResult); - } - - QueryPage finalPage = new QueryPage( - documents: finalResult, - requestCharge: default, - activityId: default, - cosmosQueryExecutionInfo: default, - distributionPlanSpec: default, - disallowContinuationTokenMessage: default, - additionalHeaders: default, - state: default, - streaming: default); - - queryPage = finalPage; - this.returnedFinalPage = true; - } - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - private readonly struct DCountContinuationToken - { - private const string SourceTokenName = "SourceToken"; - private const string DCountTokenName = "DCountToken"; - - public DCountContinuationToken( - long count, - CosmosElement sourceContinuationToken) - { - this.Count = count; - this.SourceContinuationToken = sourceContinuationToken; - } - - public long Count { get; } - - public CosmosElement CountToken => CosmosNumber64.Create(this.Count); - - public CosmosElement SourceContinuationToken { get; } - - public static CosmosElement ToCosmosElement(DCountContinuationToken dcountContinuationToken) - { - Dictionary dictionary = new Dictionary() - { - { - DCountContinuationToken.SourceTokenName, - dcountContinuationToken.SourceContinuationToken - }, - { - DCountContinuationToken.DCountTokenName, - dcountContinuationToken.CountToken - } - }; - - return CosmosObject.Create(dictionary); - } - - public static bool TryCreateFromCosmosElement( - CosmosElement continuationToken, - out DCountContinuationToken dContinuationToken) - { - if (continuationToken == null) - { - throw new ArgumentNullException(nameof(continuationToken)); - } - - if (!(continuationToken is CosmosObject rawAggregateContinuationToken)) - { - dContinuationToken = default; - return false; - } - - if (!rawAggregateContinuationToken.TryGetValue( - DCountContinuationToken.DCountTokenName, - out CosmosElement countToken)) - { - dContinuationToken = default; - return false; - } - - if (!(countToken is CosmosNumber count)) - { - dContinuationToken = default; - return false; - } - - if (!rawAggregateContinuationToken.TryGetValue( - DCountContinuationToken.SourceTokenName, - out CosmosElement sourceContinuationToken)) - { - dContinuationToken = default; - return false; - } - - dContinuationToken = new DCountContinuationToken( - count: Number64.ToLong(count.Value), - sourceContinuationToken: sourceContinuationToken); - return true; - } - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs index 3f4079ad45..aa43a55ee6 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/DCount/DCountQueryPipelineStage.cs @@ -47,27 +47,24 @@ public DCountQueryPipelineStage( } public static TryCatch MonadicCreate( - ExecutionEnvironment executionEnvironment, DCountInfo info, CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) => executionEnvironment switch - { - ExecutionEnvironment.Client => ClientDCountQueryPipelineStage.MonadicCreate( - info, - continuationToken, - monadicCreatePipelineStage), - ExecutionEnvironment.Compute => ComputeDCountQueryPipelineStage.MonadicCreate( - info, - continuationToken, - monadicCreatePipelineStage), - _ => throw new ArgumentException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}."), - }; + MonadicCreatePipelineStage monadicCreatePipelineStage) + { + return ClientDCountQueryPipelineStage.MonadicCreate( + info, + continuationToken, + monadicCreatePipelineStage); + } - protected CosmosElement GetFinalResult() => this.info.IsValueAggregate ? - CosmosNumber64.Create(this.count) as CosmosElement : - CosmosObject.Create(new Dictionary - { - { this.info.DCountAlias, CosmosNumber64.Create(this.count) } - }); + protected CosmosElement GetFinalResult() + { + return this.info.IsValueAggregate ? + CosmosNumber64.Create(this.count) as CosmosElement : + CosmosObject.Create(new Dictionary + { + { this.info.DCountAlias, CosmosNumber64.Create(this.count) } + }); + } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs index b8536f6c05..4da81b6d7d 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Client.cs @@ -42,7 +42,7 @@ private ClientDistinctQueryPipelineStage( this.distinctQueryType = distinctQueryType; } - public static TryCatch MonadicCreate( + public static new TryCatch MonadicCreate( CosmosElement requestContinuation, MonadicCreatePipelineStage monadicCreatePipelineStage, DistinctQueryType distinctQueryType) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Compute.cs deleted file mode 100644 index 43489e6f30..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.Compute.cs +++ /dev/null @@ -1,206 +0,0 @@ -//------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -//------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - using Newtonsoft.Json; - - internal abstract partial class DistinctQueryPipelineStage : QueryPipelineStageBase - { - /// - /// Compute implementation of DISTINCT. - /// Here we never serialize the continuation token, but you can always retrieve it on demand with TryGetContinuationToken. - /// - private sealed class ComputeDistinctQueryPipelineStage : DistinctQueryPipelineStage - { - private static readonly string UseTryGetContinuationTokenMessage = $"Use TryGetContinuationToken"; - - private ComputeDistinctQueryPipelineStage( - DistinctMap distinctMap, - IQueryPipelineStage source) - : base(distinctMap, source) - { - } - - public static TryCatch MonadicCreate( - CosmosElement requestContinuation, - MonadicCreatePipelineStage monadicCreatePipelineStage, - DistinctQueryType distinctQueryType) - { - if (monadicCreatePipelineStage == null) - { - throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); - } - - DistinctContinuationToken distinctContinuationToken; - if (requestContinuation != null) - { - if (!DistinctContinuationToken.TryParse(requestContinuation, out distinctContinuationToken)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Invalid {nameof(DistinctContinuationToken)}: {requestContinuation}")); - } - } - else - { - distinctContinuationToken = new DistinctContinuationToken(sourceToken: null, distinctMapToken: null); - } - - TryCatch tryCreateDistinctMap = DistinctMap.TryCreate( - distinctQueryType, - distinctContinuationToken.DistinctMapToken); - if (!tryCreateDistinctMap.Succeeded) - { - return TryCatch.FromException(tryCreateDistinctMap.Exception); - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(distinctContinuationToken.SourceToken); - if (!tryCreateSource.Succeeded) - { - return TryCatch.FromException(tryCreateSource.Exception); - } - - return TryCatch.FromResult( - new ComputeDistinctQueryPipelineStage( - tryCreateDistinctMap.Result, - tryCreateSource.Result)); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (!await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - this.Current = default; - return false; - } - - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - - List distinctResults = new List(); - foreach (CosmosElement document in sourcePage.Documents) - { - if (this.distinctMap.Add(document, out UInt128 _)) - { - distinctResults.Add(document); - } - } - - QueryState queryState; - if (sourcePage.State != null) - { - DistinctContinuationToken distinctContinuationToken = new DistinctContinuationToken( - sourceToken: sourcePage.State.Value, - distinctMapToken: this.distinctMap.GetCosmosElementContinuationToken()); - queryState = new QueryState(DistinctContinuationToken.ToCosmosElement(distinctContinuationToken)); - } - else - { - queryState = null; - } - - QueryPage queryPage = new QueryPage( - documents: distinctResults, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: ComputeDistinctQueryPipelineStage.UseTryGetContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: queryState, - streaming: sourcePage.Streaming); - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - private readonly struct DistinctContinuationToken - { - private const string SourceTokenName = "SourceToken"; - private const string DistinctMapTokenName = "DistinctMapToken"; - - public DistinctContinuationToken(CosmosElement sourceToken, CosmosElement distinctMapToken) - { - this.SourceToken = sourceToken; - this.DistinctMapToken = distinctMapToken; - } - - public CosmosElement SourceToken { get; } - - public CosmosElement DistinctMapToken { get; } - - public static CosmosElement ToCosmosElement(DistinctContinuationToken distinctContinuationToken) - { - Dictionary dictionary = new Dictionary() - { - { - DistinctContinuationToken.SourceTokenName, - distinctContinuationToken.SourceToken - }, - { - DistinctContinuationToken.DistinctMapTokenName, - distinctContinuationToken.DistinctMapToken - } - }; - - return CosmosObject.Create(dictionary); - } - - public static bool TryParse( - CosmosElement requestContinuationToken, - out DistinctContinuationToken distinctContinuationToken) - { - if (requestContinuationToken == null) - { - distinctContinuationToken = default; - return false; - } - - if (!(requestContinuationToken is CosmosObject rawObject)) - { - distinctContinuationToken = default; - return false; - } - - if (!rawObject.TryGetValue(SourceTokenName, out CosmosElement sourceToken)) - { - distinctContinuationToken = default; - return false; - } - - if (!rawObject.TryGetValue(DistinctMapTokenName, out CosmosElement distinctMapToken)) - { - distinctContinuationToken = default; - return false; - } - - distinctContinuationToken = new DistinctContinuationToken(sourceToken: sourceToken, distinctMapToken: distinctMapToken); - return true; - } - } - } - } -} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs index c929f84dcb..9bf8b601da 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Distinct/DistinctQueryPipelineStage.cs @@ -35,20 +35,14 @@ protected DistinctQueryPipelineStage( } public static TryCatch MonadicCreate( - ExecutionEnvironment executionEnvironment, CosmosElement requestContinuation, MonadicCreatePipelineStage monadicCreatePipelineStage, - DistinctQueryType distinctQueryType) => executionEnvironment switch - { - ExecutionEnvironment.Client => ClientDistinctQueryPipelineStage.MonadicCreate( - requestContinuation, - monadicCreatePipelineStage, - distinctQueryType), - ExecutionEnvironment.Compute => ComputeDistinctQueryPipelineStage.MonadicCreate( - requestContinuation, - monadicCreatePipelineStage, - distinctQueryType), - _ => throw new ArgumentException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}."), - }; + DistinctQueryType distinctQueryType) + { + return ClientDistinctQueryPipelineStage.MonadicCreate( + requestContinuation, + monadicCreatePipelineStage, + distinctQueryType); + } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/ExecutionEnvironment.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/ExecutionEnvironment.cs deleted file mode 100644 index 2daa905243..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/ExecutionEnvironment.cs +++ /dev/null @@ -1,22 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline -{ - /// - /// Environment the query is going to be executed on. - /// - internal enum ExecutionEnvironment - { - /// - /// Query is being executed on a 3rd party client. - /// - Client, - - /// - /// Query is being executed on the compute gateway. - /// - Compute, - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs index 6ed1804abf..f469472a3f 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Client.cs @@ -27,7 +27,7 @@ private ClientGroupByQueryPipelineStage( { } - public static TryCatch MonadicCreate( + public static new TryCatch MonadicCreate( CosmosElement requestContinuation, MonadicCreatePipelineStage monadicCreatePipelineStage, IReadOnlyList aggregates, diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Compute.cs deleted file mode 100644 index 2692e9ba6b..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.Compute.cs +++ /dev/null @@ -1,253 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.GroupBy -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Metrics; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Aggregate; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class GroupByQueryPipelineStage : QueryPipelineStageBase - { - private sealed class ComputeGroupByQueryPipelineStage : GroupByQueryPipelineStage - { - private const string DoneReadingGroupingsContinuationToken = "DONE"; - private static readonly CosmosElement DoneCosmosElementToken = CosmosString.Create(DoneReadingGroupingsContinuationToken); - - private static readonly IReadOnlyList EmptyResults = new List().AsReadOnly(); - private static readonly IReadOnlyDictionary EmptyQueryMetrics = new Dictionary(); - - private ComputeGroupByQueryPipelineStage( - IQueryPipelineStage source, - GroupingTable groupingTable, - int pageSize) - : base(source, groupingTable, pageSize) - { - } - - public static TryCatch MonadicCreate( - CosmosElement requestContinuation, - MonadicCreatePipelineStage monadicCreatePipelineStage, - IReadOnlyList aggregates, - IReadOnlyDictionary groupByAliasToAggregateType, - IReadOnlyList orderedAliases, - bool hasSelectValue, - int pageSize) - { - GroupByContinuationToken groupByContinuationToken; - if (requestContinuation != null) - { - if (!GroupByContinuationToken.TryParse(requestContinuation, out groupByContinuationToken)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Invalid {nameof(GroupByContinuationToken)}: '{requestContinuation}'")); - } - } - else - { - groupByContinuationToken = new GroupByContinuationToken( - groupingTableContinuationToken: null, - sourceContinuationToken: null); - } - - TryCatch tryCreateSource; - if ((groupByContinuationToken.SourceContinuationToken is CosmosString sourceContinuationToken) - && (sourceContinuationToken.Value == ComputeGroupByQueryPipelineStage.DoneReadingGroupingsContinuationToken)) - { - tryCreateSource = TryCatch.FromResult(EmptyQueryPipelineStage.Singleton); - } - else - { - tryCreateSource = monadicCreatePipelineStage(groupByContinuationToken.SourceContinuationToken); - } - - if (!tryCreateSource.Succeeded) - { - return TryCatch.FromException(tryCreateSource.Exception); - } - - TryCatch tryCreateGroupingTable = GroupingTable.TryCreateFromContinuationToken( - aggregates, - groupByAliasToAggregateType, - orderedAliases, - hasSelectValue, - groupByContinuationToken.GroupingTableContinuationToken); - - if (!tryCreateGroupingTable.Succeeded) - { - return TryCatch.FromException(tryCreateGroupingTable.Exception); - } - - return TryCatch.FromResult( - new ComputeGroupByQueryPipelineStage( - tryCreateSource.Result, - tryCreateGroupingTable.Result, - pageSize)); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.returnedLastPage) - { - this.Current = default; - return false; - } - - // Draining GROUP BY is broken down into two stages: - QueryPage queryPage; - if (await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - // Stage 1: - // Drain the groupings fully from all continuation and all partitions - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - - this.AggregateGroupings(sourcePage.Documents); - - // We need to give empty pages until the results are fully drained. - CosmosElement sourceContinuationToken = sourcePage.State == null ? DoneCosmosElementToken : sourcePage.State.Value; - GroupByContinuationToken groupByContinuationToken = new GroupByContinuationToken( - groupingTableContinuationToken: this.groupingTable.GetCosmosElementContinuationToken(), - sourceContinuationToken: sourceContinuationToken); - QueryState state = new QueryState(GroupByContinuationToken.ToCosmosElement(groupByContinuationToken)); - - queryPage = new QueryPage( - documents: EmptyResults, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: null, - additionalHeaders: sourcePage.AdditionalHeaders, - state: state, - streaming: sourcePage.Streaming); - } - else - { - // Stage 2: - // Emit the results from the grouping table page by page - IReadOnlyList results = this.groupingTable.Drain(this.pageSize); - - QueryState state; - if (this.groupingTable.IsDone) - { - state = default; - this.returnedLastPage = true; - } - else - { - GroupByContinuationToken groupByContinuationToken = new GroupByContinuationToken( - groupingTableContinuationToken: this.groupingTable.GetCosmosElementContinuationToken(), - sourceContinuationToken: DoneCosmosElementToken); - state = new QueryState(GroupByContinuationToken.ToCosmosElement(groupByContinuationToken)); - } - - queryPage = new QueryPage( - documents: results, - requestCharge: default, - activityId: default, - cosmosQueryExecutionInfo: default, - distributionPlanSpec: default, - disallowContinuationTokenMessage: default, - additionalHeaders: default, - state: state, - streaming: null); - } - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - private readonly struct GroupByContinuationToken - { - private static class PropertyNames - { - public const string SourceToken = "SourceToken"; - public const string GroupingTableContinuationToken = "GroupingTableContinuationToken"; - } - - public GroupByContinuationToken( - CosmosElement groupingTableContinuationToken, - CosmosElement sourceContinuationToken) - { - this.GroupingTableContinuationToken = groupingTableContinuationToken; - this.SourceContinuationToken = sourceContinuationToken; - } - - public CosmosElement GroupingTableContinuationToken { get; } - - public CosmosElement SourceContinuationToken { get; } - - public static CosmosElement ToCosmosElement(GroupByContinuationToken groupByContinuationToken) - { - Dictionary dictionary = new Dictionary() - { - { - GroupByContinuationToken.PropertyNames.SourceToken, - groupByContinuationToken.SourceContinuationToken - }, - { - GroupByContinuationToken.PropertyNames.GroupingTableContinuationToken, - groupByContinuationToken.GroupingTableContinuationToken - }, - }; - - return CosmosObject.Create(dictionary); - } - - public static bool TryParse(CosmosElement value, out GroupByContinuationToken groupByContinuationToken) - { - if (!(value is CosmosObject groupByContinuationTokenObject)) - { - groupByContinuationToken = default; - return false; - } - - if (!groupByContinuationTokenObject.TryGetValue( - GroupByContinuationToken.PropertyNames.GroupingTableContinuationToken, - out CosmosElement groupingTableContinuationToken)) - { - groupByContinuationToken = default; - return false; - } - - if (!groupByContinuationTokenObject.TryGetValue( - GroupByContinuationToken.PropertyNames.SourceToken, - out CosmosElement sourceContinuationToken)) - { - groupByContinuationToken = default; - return false; - } - - groupByContinuationToken = new GroupByContinuationToken( - groupingTableContinuationToken: groupingTableContinuationToken, - sourceContinuationToken: sourceContinuationToken); - return true; - } - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs index 58e400b72e..7368239429 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/GroupBy/GroupByQueryPipelineStage.cs @@ -62,33 +62,23 @@ protected GroupByQueryPipelineStage( } public static TryCatch MonadicCreate( - ExecutionEnvironment executionEnvironment, CosmosElement continuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage, IReadOnlyList aggregates, IReadOnlyDictionary groupByAliasToAggregateType, IReadOnlyList orderedAliases, bool hasSelectValue, - int pageSize) => executionEnvironment switch - { - ExecutionEnvironment.Client => ClientGroupByQueryPipelineStage.MonadicCreate( - continuationToken, - monadicCreatePipelineStage, - aggregates, - groupByAliasToAggregateType, - orderedAliases, - hasSelectValue, - pageSize), - ExecutionEnvironment.Compute => ComputeGroupByQueryPipelineStage.MonadicCreate( - continuationToken, - monadicCreatePipelineStage, - aggregates, - groupByAliasToAggregateType, - orderedAliases, - hasSelectValue, - pageSize), - _ => throw new ArgumentException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}"), - }; + int pageSize) + { + return ClientGroupByQueryPipelineStage.MonadicCreate( + continuationToken, + monadicCreatePipelineStage, + aggregates, + groupByAliasToAggregateType, + orderedAliases, + hasSelectValue, + pageSize); + } protected void AggregateGroupings(IReadOnlyList cosmosElements) { diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs index 93008125fb..1ae85407cd 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs @@ -26,7 +26,6 @@ namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline internal static class PipelineFactory { public static TryCatch MonadicCreate( - ExecutionEnvironment executionEnvironment, IDocumentContainer documentContainer, SqlQuerySpec sqlQuerySpec, IReadOnlyList targetRanges, @@ -100,7 +99,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => AggregateQueryPipelineStage.MonadicCreate( - executionEnvironment, queryInfo.Aggregates, queryInfo.GroupByAliasToAggregateType, queryInfo.GroupByAliases, @@ -113,7 +111,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => DistinctQueryPipelineStage.MonadicCreate( - executionEnvironment, continuationToken, monadicCreateSourceStage, queryInfo.DistinctType); @@ -123,7 +120,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => GroupByQueryPipelineStage.MonadicCreate( - executionEnvironment, continuationToken, monadicCreateSourceStage, queryInfo.Aggregates, @@ -137,7 +133,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => SkipQueryPipelineStage.MonadicCreate( - executionEnvironment, queryInfo.Offset.Value, continuationToken, monadicCreateSourceStage); @@ -147,7 +142,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => TakeQueryPipelineStage.MonadicCreateLimitStage( - executionEnvironment, queryInfo.Limit.Value, continuationToken, monadicCreateSourceStage); @@ -157,7 +151,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => TakeQueryPipelineStage.MonadicCreateTopStage( - executionEnvironment, queryInfo.Top.Value, continuationToken, monadicCreateSourceStage); @@ -167,7 +160,6 @@ public static TryCatch MonadicCreate( { MonadicCreatePipelineStage monadicCreateSourceStage = monadicCreatePipelineStage; monadicCreatePipelineStage = (continuationToken) => DCountQueryPipelineStage.MonadicCreate( - executionEnvironment, queryInfo.DCountInfo, continuationToken, monadicCreateSourceStage); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Client.cs index e04b75207d..f23744e96c 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Client.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Client.cs @@ -28,7 +28,7 @@ private ClientSkipQueryPipelineStage( // Work is done in base constructor. } - public static TryCatch MonadicCreate( + public static new TryCatch MonadicCreate( int offsetCount, CosmosElement continuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Compute.cs deleted file mode 100644 index 8acb424510..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.Compute.cs +++ /dev/null @@ -1,222 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Skip -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.CosmosElements.Numbers; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class SkipQueryPipelineStage : QueryPipelineStageBase - { - private sealed class ComputeSkipQueryPipelineStage : SkipQueryPipelineStage - { - private ComputeSkipQueryPipelineStage(IQueryPipelineStage source, long skipCount) - : base(source, skipCount) - { - // Work is done in base constructor. - } - - public static TryCatch MonadicCreate( - int offsetCount, - CosmosElement continuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) - { - if (monadicCreatePipelineStage == null) - { - throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); - } - - OffsetContinuationToken offsetContinuationToken; - if (continuationToken != null) - { - (bool parsed, OffsetContinuationToken parsedToken) = OffsetContinuationToken.TryParse(continuationToken); - if (!parsed) - { - return TryCatch.FromException( - new MalformedContinuationTokenException($"Invalid {nameof(SkipQueryPipelineStage)}: {continuationToken}.")); - } - - offsetContinuationToken = parsedToken; - } - else - { - offsetContinuationToken = new OffsetContinuationToken(offsetCount, null); - } - - if (offsetContinuationToken.Offset > offsetCount) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - "offset count in continuation token can not be greater than the offsetcount in the query.")); - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(offsetContinuationToken.SourceToken); - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - IQueryPipelineStage stage = new ComputeSkipQueryPipelineStage( - tryCreateSource.Result, - offsetContinuationToken.Offset); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (!await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - this.Current = default; - return false; - } - - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - - // Skip the documents but keep all the other headers - IReadOnlyList documentsAfterSkip = sourcePage.Documents.Skip(this.skipCount).ToList(); - - int numberOfDocumentsSkipped = sourcePage.Documents.Count() - documentsAfterSkip.Count(); - this.skipCount -= numberOfDocumentsSkipped; - - QueryState state; - if (sourcePage.State == null) - { - state = default; - } - else - { - OffsetContinuationToken offsetContinuationToken = new OffsetContinuationToken( - offset: this.skipCount, - sourceToken: sourcePage.State.Value); - - state = new QueryState(OffsetContinuationToken.ToCosmosElement(offsetContinuationToken)); - } - - QueryPage queryPage = new QueryPage( - documents: documentsAfterSkip, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: state, - streaming: sourcePage.Streaming); - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - /// - /// A OffsetContinuationToken is a composition of a source continuation token and how many items to skip from that source. - /// - private readonly struct OffsetContinuationToken - { - private static class ProperytNames - { - public const string SkipCountProperty = "SkipCount"; - public const string SourceTokenProperty = "SourceToken"; - } - - /// - /// Initializes a new instance of the OffsetContinuationToken struct. - /// - /// The number of items to skip in the query. - /// The continuation token for the source component of the query. - public OffsetContinuationToken(long offset, CosmosElement sourceToken) - { - if ((offset < 0) || (offset > int.MaxValue)) - { - throw new ArgumentOutOfRangeException(nameof(offset)); - } - - this.Offset = (int)offset; - this.SourceToken = sourceToken; - } - - /// - /// The number of items to skip in the query. - /// - public int Offset - { - get; - } - - /// - /// Gets the continuation token for the source component of the query. - /// - public CosmosElement SourceToken - { - get; - } - - public static CosmosElement ToCosmosElement(OffsetContinuationToken offsetContinuationToken) - { - Dictionary dictionary = new Dictionary() - { - { - OffsetContinuationToken.ProperytNames.SkipCountProperty, - CosmosNumber64.Create(offsetContinuationToken.Offset) - }, - { - OffsetContinuationToken.ProperytNames.SourceTokenProperty, - offsetContinuationToken.SourceToken - } - }; - - return CosmosObject.Create(dictionary); - } - - public static (bool parsed, OffsetContinuationToken offsetContinuationToken) TryParse(CosmosElement value) - { - if (value == null) - { - return (false, default); - } - - if (!(value is CosmosObject cosmosObject)) - { - return (false, default); - } - - if (!cosmosObject.TryGetValue(OffsetContinuationToken.ProperytNames.SkipCountProperty, out CosmosNumber offset)) - { - return (false, default); - } - - if (!cosmosObject.TryGetValue(OffsetContinuationToken.ProperytNames.SourceTokenProperty, out CosmosElement sourceToken)) - { - return (false, default); - } - - return (true, new OffsetContinuationToken(Number64.ToLong(offset.Value), sourceToken)); - } - } - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.cs index adf84a9fb6..d1ce26079b 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Skip/SkipQueryPipelineStage.cs @@ -31,23 +31,14 @@ protected SkipQueryPipelineStage( } public static TryCatch MonadicCreate( - ExecutionEnvironment executionEnvironment, int offsetCount, CosmosElement continuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) { - TryCatch tryCreate = executionEnvironment switch - { - ExecutionEnvironment.Client => ClientSkipQueryPipelineStage.MonadicCreate( - offsetCount, - continuationToken, - monadicCreatePipelineStage), - ExecutionEnvironment.Compute => ComputeSkipQueryPipelineStage.MonadicCreate( + TryCatch tryCreate = ClientSkipQueryPipelineStage.MonadicCreate( offsetCount, continuationToken, - monadicCreatePipelineStage), - _ => throw new ArgumentException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}"), - }; + monadicCreatePipelineStage); return tryCreate; } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Compute.cs deleted file mode 100644 index fd6e0bd5ff..0000000000 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Compute.cs +++ /dev/null @@ -1,224 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// ------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Query.Core.Pipeline.Take -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.CosmosElements.Numbers; - using Microsoft.Azure.Cosmos.Query.Core.Exceptions; - using Microsoft.Azure.Cosmos.Query.Core.Monads; - using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; - using Microsoft.Azure.Cosmos.Tracing; - - internal abstract partial class TakeQueryPipelineStage : QueryPipelineStageBase - { - private sealed class ComputeTakeQueryPipelineStage : TakeQueryPipelineStage - { - private ComputeTakeQueryPipelineStage( - IQueryPipelineStage source, - int takeCount) - : base(source, takeCount) - { - // Work is done in the base class. - } - - public static TryCatch MonadicCreateLimitStage( - int takeCount, - CosmosElement requestContinuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) => ComputeTakeQueryPipelineStage.MonadicCreate( - takeCount, - requestContinuationToken, - monadicCreatePipelineStage); - - public static TryCatch MonadicCreateTopStage( - int takeCount, - CosmosElement requestContinuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) => ComputeTakeQueryPipelineStage.MonadicCreate( - takeCount, - requestContinuationToken, - monadicCreatePipelineStage); - - private static TryCatch MonadicCreate( - int takeCount, - CosmosElement requestContinuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) - { - if (takeCount < 0) - { - throw new ArgumentException($"{nameof(takeCount)}: {takeCount} must be a non negative number."); - } - - if (monadicCreatePipelineStage == null) - { - throw new ArgumentNullException(nameof(monadicCreatePipelineStage)); - } - - TakeContinuationToken takeContinuationToken; - if (requestContinuationToken != null) - { - if (!TakeContinuationToken.TryParse(requestContinuationToken, out takeContinuationToken)) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"Malformed {nameof(TakeContinuationToken)}: {requestContinuationToken}.")); - } - } - else - { - takeContinuationToken = new TakeContinuationToken(takeCount, sourceToken: null); - } - - if (takeContinuationToken.TakeCount > takeCount) - { - return TryCatch.FromException( - new MalformedContinuationTokenException( - $"{nameof(TakeContinuationToken.TakeCount)} in {nameof(TakeContinuationToken)}: {requestContinuationToken}: {takeContinuationToken.TakeCount} can not be greater than the limit count in the query: {takeCount}.")); - } - - TryCatch tryCreateSource = monadicCreatePipelineStage(takeContinuationToken.SourceToken); - if (tryCreateSource.Failed) - { - return tryCreateSource; - } - - IQueryPipelineStage stage = new ComputeTakeQueryPipelineStage( - tryCreateSource.Result, - takeContinuationToken.TakeCount); - - return TryCatch.FromResult(stage); - } - - public override async ValueTask MoveNextAsync(ITrace trace, CancellationToken cancellationToken) - { - cancellationToken.ThrowIfCancellationRequested(); - - if (trace == null) - { - throw new ArgumentNullException(nameof(trace)); - } - - if (this.ReturnedFinalPage || !await this.inputStage.MoveNextAsync(trace, cancellationToken)) - { - this.Current = default; - this.takeCount = 0; - return false; - } - - TryCatch tryGetSourcePage = this.inputStage.Current; - if (tryGetSourcePage.Failed) - { - this.Current = tryGetSourcePage; - return true; - } - - QueryPage sourcePage = tryGetSourcePage.Result; - - List takedDocuments = sourcePage.Documents.Take(this.takeCount).ToList(); - this.takeCount -= takedDocuments.Count; - - QueryState queryState; - if (sourcePage.State != null) - { - TakeContinuationToken takeContinuationToken = new TakeContinuationToken( - takeCount: this.takeCount, - sourceToken: sourcePage.State.Value); - queryState = new QueryState(TakeContinuationToken.ToCosmosElement(takeContinuationToken)); - } - else - { - queryState = default; - } - - QueryPage queryPage = new QueryPage( - documents: takedDocuments, - requestCharge: sourcePage.RequestCharge, - activityId: sourcePage.ActivityId, - cosmosQueryExecutionInfo: sourcePage.CosmosQueryExecutionInfo, - distributionPlanSpec: default, - disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, - additionalHeaders: sourcePage.AdditionalHeaders, - state: queryState, - streaming: sourcePage.Streaming); - - this.Current = TryCatch.FromResult(queryPage); - return true; - } - - private readonly struct TakeContinuationToken - { - public static class PropertyNames - { - public const string SourceToken = "SourceToken"; - public const string TakeCount = "TakeCount"; - } - - public TakeContinuationToken(long takeCount, CosmosElement sourceToken) - { - if ((takeCount < 0) || (takeCount > int.MaxValue)) - { - throw new ArgumentException($"{nameof(takeCount)} must be a non negative number."); - } - - this.TakeCount = (int)takeCount; - this.SourceToken = sourceToken; - } - - public int TakeCount { get; } - - public CosmosElement SourceToken { get; } - - public static CosmosElement ToCosmosElement(TakeContinuationToken takeContinuationToken) - { - Dictionary dictionary = new Dictionary() - { - { - TakeContinuationToken.PropertyNames.SourceToken, - takeContinuationToken.SourceToken - }, - { - TakeContinuationToken.PropertyNames.TakeCount, - CosmosNumber64.Create(takeContinuationToken.TakeCount) - }, - }; - - return CosmosObject.Create(dictionary); - } - - public static bool TryParse(CosmosElement value, out TakeContinuationToken takeContinuationToken) - { - if (value == null) - { - throw new ArgumentNullException(nameof(value)); - } - - if (!(value is CosmosObject continuationToken)) - { - takeContinuationToken = default; - return false; - } - - if (!continuationToken.TryGetValue(TakeContinuationToken.PropertyNames.TakeCount, out CosmosNumber takeCount)) - { - takeContinuationToken = default; - return false; - } - - if (!continuationToken.TryGetValue(TakeContinuationToken.PropertyNames.SourceToken, out CosmosElement sourceToken)) - { - takeContinuationToken = default; - return false; - } - - takeContinuationToken = new TakeContinuationToken(Number64.ToLong(takeCount.Value), sourceToken); - return true; - } - } - } - } -} From 122a4f4f75c267d4e1f90ff039da672bc3d659c3 Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Tue, 25 Jun 2024 11:04:23 -0700 Subject: [PATCH 2/3] Add changes that were missed in the last commit --- .../Take/TakeQueryPipelineStage.Client.cs | 4 +- .../Pipeline/Take/TakeQueryPipelineStage.cs | 40 +++++-------- .../src/Query/v3Query/QueryIterator.cs | 57 +++++++------------ .../src/RequestOptions/QueryRequestOptions.cs | 2 - .../Query/QueryTestsBase.cs | 1 - ...misticDirectExecutionQueryBaselineTests.cs | 1 - .../AggregateQueryPipelineStageTests.cs | 5 -- .../Pipeline/DCountQueryPipelineStageTests.cs | 12 ---- .../DistinctQueryPipelineStageTests.cs | 34 +++++------ .../Query/Pipeline/FactoryTests.cs | 1 - .../Query/Pipeline/FullPipelineTests.cs | 2 - .../GroupByQueryPipelineStageTests.cs | 3 - .../Pipeline/SkipQueryPipelineStageTests.cs | 3 - .../Pipeline/TakeQueryPipelineStageTests.cs | 4 -- .../Query/SubpartitionTests.cs | 1 - .../Tracing/TraceWriterBaselineTests.cs | 1 - 16 files changed, 52 insertions(+), 119 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Client.cs index d7d0dea1b4..712642e95a 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Client.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.Client.cs @@ -31,7 +31,7 @@ private ClientTakeQueryPipelineStage( this.takeEnum = takeEnum; } - public static TryCatch MonadicCreateLimitStage( + public static new TryCatch MonadicCreateLimitStage( int limitCount, CosmosElement requestContinuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) @@ -101,7 +101,7 @@ public static TryCatch MonadicCreateLimitStage( return TryCatch.FromResult(stage); } - public static TryCatch MonadicCreateTopStage( + public static new TryCatch MonadicCreateTopStage( int topCount, CosmosElement requestContinuationToken, MonadicCreatePipelineStage monadicCreatePipelineStage) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.cs index 5dea48e840..ab37bfe4a6 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/Take/TakeQueryPipelineStage.cs @@ -24,37 +24,25 @@ protected TakeQueryPipelineStage( } public static TryCatch MonadicCreateLimitStage( - ExecutionEnvironment executionEnvironment, int limitCount, CosmosElement requestContinuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) => executionEnvironment switch - { - ExecutionEnvironment.Client => ClientTakeQueryPipelineStage.MonadicCreateLimitStage( - limitCount, - requestContinuationToken, - monadicCreatePipelineStage), - ExecutionEnvironment.Compute => ComputeTakeQueryPipelineStage.MonadicCreateLimitStage( - limitCount, - requestContinuationToken, - monadicCreatePipelineStage), - _ => throw new ArgumentOutOfRangeException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}."), - }; + MonadicCreatePipelineStage monadicCreatePipelineStage) + { + return ClientTakeQueryPipelineStage.MonadicCreateLimitStage( + limitCount, + requestContinuationToken, + monadicCreatePipelineStage); + } public static TryCatch MonadicCreateTopStage( - ExecutionEnvironment executionEnvironment, int limitCount, CosmosElement requestContinuationToken, - MonadicCreatePipelineStage monadicCreatePipelineStage) => executionEnvironment switch - { - ExecutionEnvironment.Client => ClientTakeQueryPipelineStage.MonadicCreateTopStage( - limitCount, - requestContinuationToken, - monadicCreatePipelineStage), - ExecutionEnvironment.Compute => ComputeTakeQueryPipelineStage.MonadicCreateTopStage( - limitCount, - requestContinuationToken, - monadicCreatePipelineStage), - _ => throw new ArgumentOutOfRangeException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}."), - }; + MonadicCreatePipelineStage monadicCreatePipelineStage) + { + return ClientTakeQueryPipelineStage.MonadicCreateTopStage( + limitCount, + requestContinuationToken, + monadicCreatePipelineStage); + } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs b/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs index abfe5fbb44..e9d8da0ab5 100644 --- a/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs @@ -94,42 +94,30 @@ public static QueryIterator Create( resourceType: resourceType); DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer); - CosmosElement requestContinuationToken; - switch (queryRequestOptions.ExecutionEnvironment.GetValueOrDefault(ExecutionEnvironment.Client)) + CosmosElement requestContinuationToken; + if (continuationToken != null) { - case ExecutionEnvironment.Client: - if (continuationToken != null) - { - TryCatch tryParse = CosmosElement.Monadic.Parse(continuationToken); - if (tryParse.Failed) - { - return new QueryIterator( - cosmosQueryContext, - new FaultedQueryPipelineStage( - new MalformedContinuationTokenException( - message: $"Malformed Continuation Token: {continuationToken}", - innerException: tryParse.Exception)), - queryRequestOptions.CosmosSerializationFormatOptions, - queryRequestOptions, - clientContext, - correlatedActivityId, - containerCore); - } - - requestContinuationToken = tryParse.Result; - } - else - { - requestContinuationToken = null; - } - break; - - case ExecutionEnvironment.Compute: - requestContinuationToken = queryRequestOptions.CosmosElementContinuationToken; - break; + TryCatch tryParse = CosmosElement.Monadic.Parse(continuationToken); + if (tryParse.Failed) + { + return new QueryIterator( + cosmosQueryContext, + new FaultedQueryPipelineStage( + new MalformedContinuationTokenException( + message: $"Malformed Continuation Token: {continuationToken}", + innerException: tryParse.Exception)), + queryRequestOptions.CosmosSerializationFormatOptions, + queryRequestOptions, + clientContext, + correlatedActivityId, + containerCore); + } - default: - throw new ArgumentOutOfRangeException($"Unknown {nameof(ExecutionEnvironment)}: {queryRequestOptions.ExecutionEnvironment.Value}."); + requestContinuationToken = tryParse.Result; + } + else + { + requestContinuationToken = null; } CosmosQueryExecutionContextFactory.InputParameters inputParameters = new CosmosQueryExecutionContextFactory.InputParameters( @@ -142,7 +130,6 @@ public static QueryIterator Create( partitionKey: queryRequestOptions.PartitionKey, properties: queryRequestOptions.Properties, partitionedQueryExecutionInfo: partitionedQueryExecutionInfo, - executionEnvironment: queryRequestOptions.ExecutionEnvironment, returnResultsInDeterministicOrder: queryRequestOptions.ReturnResultsInDeterministicOrder, enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution, isNonStreamingOrderByQueryFeatureDisabled: queryRequestOptions.IsNonStreamingOrderByQueryFeatureDisabled, diff --git a/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs b/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs index bbb019f08b..05bc8274e9 100644 --- a/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs +++ b/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs @@ -183,8 +183,6 @@ public ConsistencyLevel? ConsistencyLevel internal SupportedSerializationFormats? SupportedSerializationFormats { get; set; } - internal ExecutionEnvironment? ExecutionEnvironment { get; set; } - internal bool? ReturnResultsInDeterministicOrder { get; set; } internal TestInjections TestSettings { get; set; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs index 8af5eeafef..1c20a3bb0e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs @@ -593,7 +593,6 @@ internal static async Task> QueryWithCosmosElementContinuationTokenAsync TestSettings = queryRequestOptions.TestSettings, }; - computeRequestOptions.ExecutionEnvironment = ExecutionEnvironment.Compute; computeRequestOptions.CosmosElementContinuationToken = continuationToken; using (FeedIteratorInternal itemQuery = (FeedIteratorInternal)container.GetItemQueryIterator( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs index 267ea76b0e..05e8867740 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OptimisticDirectExecutionQueryBaselineTests.cs @@ -1019,7 +1019,6 @@ public override OptimisticDirectExecutionTestOutput ExecuteTest(OptimisticDirect partitionKey: input.PartitionKeyValue, properties: new Dictionary() { { "x-ms-query-partitionkey-definition", input.PartitionKeyDefinition } }, partitionedQueryExecutionInfo: null, - executionEnvironment: null, returnResultsInDeterministicOrder: null, enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution, isNonStreamingOrderByQueryFeatureDisabled: false, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggregateQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggregateQueryPipelineStageTests.cs index 61dba34d02..0df6d099c6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggregateQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggregateQueryPipelineStageTests.cs @@ -34,7 +34,6 @@ public async Task SinglePageAsync() List elements = await AggregateQueryPipelineStageTests.CreateAndDrain( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, aggregates: new List() { AggregateOperator.Sum }, aliasToAggregateType: new Dictionary() { { "$1", AggregateOperator.Sum } }, orderedAliases: new List() { "$1" }, @@ -58,7 +57,6 @@ public async Task MultiplePagesAsync() List elements = await AggregateQueryPipelineStageTests.CreateAndDrain( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, aggregates: new List() { AggregateOperator.Sum }, aliasToAggregateType: new Dictionary() { { "$1", AggregateOperator.Sum } }, orderedAliases: new List() { "$1" }, @@ -82,7 +80,6 @@ public async Task UndefinedSinglePageAsync() List elements = await AggregateQueryPipelineStageTests.CreateAndDrain( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, aggregates: new List() { AggregateOperator.Sum }, aliasToAggregateType: new Dictionary() { { "$1", AggregateOperator.Sum } }, orderedAliases: new List() { "$1" }, @@ -95,7 +92,6 @@ public async Task UndefinedSinglePageAsync() private static async Task> CreateAndDrain( IReadOnlyList> pages, - ExecutionEnvironment executionEnvironment, IReadOnlyList aggregates, IReadOnlyDictionary aliasToAggregateType, IReadOnlyList orderedAliases, @@ -105,7 +101,6 @@ private static async Task> CreateAndDrain( IQueryPipelineStage source = new MockQueryPipelineStage(pages); TryCatch tryCreateAggregateQueryPipelineStage = AggregateQueryPipelineStage.MonadicCreate( - executionEnvironment: executionEnvironment, aggregates: aggregates, aliasToAggregateType: aliasToAggregateType, orderedAliases: orderedAliases, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DCountQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DCountQueryPipelineStageTests.cs index 93dc5b9459..0f4175bd29 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DCountQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DCountQueryPipelineStageTests.cs @@ -87,7 +87,6 @@ private static async Task Run( { List elementsCompute = await CreateAndDrainAsync( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, continuationToken: null, distinctQueryType: distinctQueryType, dcountAlias: dcountAlias); @@ -96,7 +95,6 @@ private static async Task Run( List elementsClient = await CreateAndDrainWithoutStateAsync( pages: pages, - executionEnvironment: ExecutionEnvironment.Client, continuationToken: null, distinctQueryType: distinctQueryType, dcountAlias: dcountAlias); @@ -133,21 +131,18 @@ private static void Validate(int expectedCount, string dcountAlias, IReadOnlyLis private static async Task> CreateAndDrainAsync( PageList pages, - ExecutionEnvironment executionEnvironment, CosmosElement continuationToken, DistinctQueryType distinctQueryType, string dcountAlias) { List resultWithoutState = await CreateAndDrainWithoutStateAsync( pages: pages, - executionEnvironment: executionEnvironment, continuationToken: continuationToken, distinctQueryType: distinctQueryType, dcountAlias: dcountAlias); List resultWithState = await CreateAndDrainWithStateAsync( pages: pages, - executionEnvironment: executionEnvironment, continuationToken: continuationToken, distinctQueryType: distinctQueryType, dcountAlias: dcountAlias); @@ -159,7 +154,6 @@ private static async Task> CreateAndDrainAsync( private static async Task> CreateAndDrainWithoutStateAsync( PageList pages, - ExecutionEnvironment executionEnvironment, CosmosElement continuationToken, DistinctQueryType distinctQueryType, string dcountAlias) @@ -167,7 +161,6 @@ private static async Task> CreateAndDrainWithoutStateAsync( List elements = new List(); IQueryPipelineStage stage = Create( pages: pages, - executionEnvironment: executionEnvironment, requestContinuationToken: continuationToken, distinctQueryType: distinctQueryType, dcountAlias: dcountAlias); @@ -183,7 +176,6 @@ private static async Task> CreateAndDrainWithoutStateAsync( private static async Task> CreateAndDrainWithStateAsync( PageList pages, - ExecutionEnvironment executionEnvironment, CosmosElement continuationToken, DistinctQueryType distinctQueryType, string dcountAlias) @@ -195,7 +187,6 @@ private static async Task> CreateAndDrainWithStateAsync( { IQueryPipelineStage stage = Create( pages: pages, - executionEnvironment: executionEnvironment, requestContinuationToken: state, distinctQueryType: distinctQueryType, dcountAlias: dcountAlias); @@ -217,7 +208,6 @@ private static async Task> CreateAndDrainWithStateAsync( private static IQueryPipelineStage Create( PageList pages, - ExecutionEnvironment executionEnvironment, CosmosElement requestContinuationToken, DistinctQueryType distinctQueryType, string dcountAlias) @@ -227,13 +217,11 @@ private static IQueryPipelineStage Create( MonadicCreatePipelineStage createDistinctQueryPipelineStage = (CosmosElement continuationToken) => DistinctQueryPipelineStage.MonadicCreate( - executionEnvironment: executionEnvironment, requestContinuation: continuationToken, distinctQueryType: distinctQueryType, monadicCreatePipelineStage: source); TryCatch tryCreateDCountQueryPipelineStage = DCountQueryPipelineStage.MonadicCreate( - executionEnvironment: executionEnvironment, continuationToken: requestContinuationToken, info: new DCountInfo { DCountAlias = dcountAlias }, monadicCreatePipelineStage: createDistinctQueryPipelineStage); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DistinctQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DistinctQueryPipelineStageTests.cs index f71559e11e..e90fcb01c0 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DistinctQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/DistinctQueryPipelineStageTests.cs @@ -187,37 +187,31 @@ private static async Task RunTests(IEnumerable elements = await DistinctQueryPipelineStageTests.CreateAndDrainAsync( - pages: pages, - executionEnvironment: env, - continuationToken: null, - distinctQueryType: DistinctQueryType.Unordered); - - List actual = elements - .Select(value => value.ToString()) - .ToList(); - - List expected = testCase.Expected - .Select(value => value.ToString()) - .ToList(); - - CollectionAssert.AreEquivalent(expected, actual); - } + IEnumerable elements = await DistinctQueryPipelineStageTests.CreateAndDrainAsync( + pages: pages, + continuationToken: null, + distinctQueryType: DistinctQueryType.Unordered); + + List actual = elements + .Select(value => value.ToString()) + .ToList(); + + List expected = testCase.Expected + .Select(value => value.ToString()) + .ToList(); + + CollectionAssert.AreEquivalent(expected, actual); } } private static async Task> CreateAndDrainAsync( IReadOnlyList> pages, - ExecutionEnvironment executionEnvironment, CosmosElement continuationToken, DistinctQueryType distinctQueryType) { IQueryPipelineStage source = new MockQueryPipelineStage(pages); TryCatch tryCreateDistinctQueryPipelineStage = DistinctQueryPipelineStage.MonadicCreate( - executionEnvironment: executionEnvironment, requestContinuation: continuationToken, distinctQueryType: distinctQueryType, monadicCreatePipelineStage: (CosmosElement continuationToken) => TryCatch.FromResult(source)); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FactoryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FactoryTests.cs index bc0c4c94a3..0bf2850f94 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FactoryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FactoryTests.cs @@ -23,7 +23,6 @@ public void TestCreate() Mock mockDocumentContainer = new Mock(); TryCatch monadicCreatePipeline = PipelineFactory.MonadicCreate( - ExecutionEnvironment.Compute, documentContainer: mockDocumentContainer.Object, sqlQuerySpec: new SqlQuerySpec("SELECT * FROM c"), targetRanges: new List() { FeedRangeEpk.FullRange }, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs index 3e56b607bd..d2df6cc192 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs @@ -340,7 +340,6 @@ private async Task TestPageSizeAsync(string query, int expectedPageSize, int exp partitionKey: partitionKeyValue, properties: new Dictionary() { { "x-ms-query-partitionkey-definition", partitionKeyDefinition } }, partitionedQueryExecutionInfo: null, - executionEnvironment: null, returnResultsInDeterministicOrder: null, enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution, isNonStreamingOrderByQueryFeatureDisabled: queryRequestOptions.IsNonStreamingOrderByQueryFeatureDisabled, @@ -604,7 +603,6 @@ private static async Task CreatePipelineAsync( IReadOnlyList feedRanges = await documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default); TryCatch tryCreatePipeline = PipelineFactory.MonadicCreate( - ExecutionEnvironment.Client, documentContainer, new SqlQuerySpec(query), feedRanges, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs index da5a5a7648..c521e2d28e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/GroupByQueryPipelineStageTests.cs @@ -34,7 +34,6 @@ public async Task SinglePageAsync() List elements = await GroupByQueryPipelineStageTests.CreateAndDrainAsync( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, continuationToken: null, groupByAliasToAggregateType: new Dictionary() { { "name", null }, { "count", AggregateOperator.Sum } }, orderedAliases: new List() { "name", "count" }, @@ -47,7 +46,6 @@ public async Task SinglePageAsync() private static async Task> CreateAndDrainAsync( IReadOnlyList> pages, - ExecutionEnvironment executionEnvironment, CosmosElement continuationToken, IReadOnlyDictionary groupByAliasToAggregateType, IReadOnlyList orderedAliases, @@ -56,7 +54,6 @@ private static async Task> CreateAndDrainAsync( IQueryPipelineStage source = new MockQueryPipelineStage(pages); TryCatch tryCreateGroupByStage = GroupByQueryPipelineStage.MonadicCreate( - executionEnvironment: executionEnvironment, continuationToken: continuationToken, monadicCreatePipelineStage: (CosmosElement continuationToken) => TryCatch.FromResult(source), aggregates: new AggregateOperator[] { }, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/SkipQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/SkipQueryPipelineStageTests.cs index c134d4b72d..b39a106a9b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/SkipQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/SkipQueryPipelineStageTests.cs @@ -32,7 +32,6 @@ public async Task SanityTests() { List elements = await SkipQueryPipelineStageTests.CreateAndDrainAsync( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, offsetCount: offsetCount, continuationToken: null); @@ -42,14 +41,12 @@ public async Task SanityTests() private static async Task> CreateAndDrainAsync( IReadOnlyList> pages, - ExecutionEnvironment executionEnvironment, int offsetCount, CosmosElement continuationToken) { IQueryPipelineStage source = new MockQueryPipelineStage(pages); TryCatch tryCreateSkipQueryPipelineStage = SkipQueryPipelineStage.MonadicCreate( - executionEnvironment: executionEnvironment, offsetCount: offsetCount, continuationToken: continuationToken, monadicCreatePipelineStage: (CosmosElement continuationToken) => TryCatch.FromResult(source)); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/TakeQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/TakeQueryPipelineStageTests.cs index c86e2c7905..52206dab12 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/TakeQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/TakeQueryPipelineStageTests.cs @@ -32,7 +32,6 @@ public async Task SanityTests() { (List elements, _) = await TakeQueryPipelineStageTests.CreateAndDrainAsync( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, takeCount: takeCount, continuationToken: null); @@ -71,7 +70,6 @@ public async Task BasicTests() { (List elements, long pageIndex) = await TakeQueryPipelineStageTests.CreateAndDrainAsync( pages: pages, - executionEnvironment: ExecutionEnvironment.Compute, takeCount: takeCount, continuationToken: null); @@ -82,14 +80,12 @@ public async Task BasicTests() private static async Task<(List, long)> CreateAndDrainAsync( IReadOnlyList> pages, - ExecutionEnvironment executionEnvironment, int takeCount, CosmosElement continuationToken) { MockQueryPipelineStage source = new MockQueryPipelineStage(pages); TryCatch tryCreateSkipQueryPipelineStage = TakeQueryPipelineStage.MonadicCreateLimitStage( - executionEnvironment: executionEnvironment, limitCount: takeCount, requestContinuationToken: continuationToken, monadicCreatePipelineStage: (CosmosElement continuationToken) => TryCatch.FromResult(source)); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/SubpartitionTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/SubpartitionTests.cs index 2308ffa424..a3ada2f8ee 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/SubpartitionTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/SubpartitionTests.cs @@ -134,7 +134,6 @@ public override SubpartitionTestOutput ExecuteTest(SubpartitionTestInput input) partitionKey: queryRequestOptions.PartitionKey, properties: new Dictionary() { { "x-ms-query-partitionkey-definition", partitionKeyDefinition } }, partitionedQueryExecutionInfo: null, - executionEnvironment: null, returnResultsInDeterministicOrder: null, enableOptimisticDirectExecution: queryRequestOptions.EnableOptimisticDirectExecution, isNonStreamingOrderByQueryFeatureDisabled: queryRequestOptions.IsNonStreamingOrderByQueryFeatureDisabled, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs index c06d7c57dd..1fd8a1d2d1 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Tracing/TraceWriterBaselineTests.cs @@ -749,7 +749,6 @@ internal static void SetEndRequestTime( private static IQueryPipelineStage CreatePipeline(IDocumentContainer documentContainer, string query, int pageSize = 10, CosmosElement state = null) { TryCatch tryCreatePipeline = PipelineFactory.MonadicCreate( - ExecutionEnvironment.Compute, documentContainer, new SqlQuerySpec(query), new List() { FeedRangeEpk.FullRange }, From 45ac13fc8a540e6ecac77d4c07abf1a04c82e87d Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Tue, 25 Jun 2024 11:26:13 -0700 Subject: [PATCH 3/3] Remove GetCosmosElementContinuationToken from FeedIterator --- .../src/ChangeFeed/ChangeFeedIteratorCore.cs | 5 -- ...geFeedPartitionKeyResultSetIteratorCore.cs | 5 -- .../src/ChangeFeed/StandByFeedIteratorCore.cs | 5 -- .../src/Query/v3Query/QueryIterator.cs | 2 - .../src/ReadFeed/ReadFeedIteratorCore.cs | 5 -- .../FeedIterators/FeedIteratorCore.cs | 10 --- .../FeedIterators/FeedIteratorInlineCore.cs | 5 -- .../FeedIteratorInlineCore{T}.cs | 5 -- .../FeedIterators/FeedIteratorInternal.cs | 2 - .../FeedIterators/FeedIteratorInternal{T}.cs | 2 - .../Query/AggregateQueryTests.cs | 2 +- .../Query/DistinctQueryTests.cs | 6 +- .../Query/GroupByQueryTests.cs | 27 +------ .../Query/QueryTestsBase.cs | 81 +------------------ .../Query/SanityQueryTests.cs | 5 +- 15 files changed, 8 insertions(+), 159 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs index bafe2c3b89..c923ef17fe 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs @@ -340,11 +340,6 @@ private async Task ReadNextInternalAsync(ITrace trace, Cancella return responseMessage; } - public override CosmosElement GetCosmosElementContinuationToken() - { - throw new NotSupportedException(); - } - private sealed class ChangeFeedStateFromToChangeFeedCrossFeedRangeState : ChangeFeedStartFromVisitor> { public static readonly ChangeFeedStateFromToChangeFeedCrossFeedRangeState Singleton = new ChangeFeedStateFromToChangeFeedCrossFeedRangeState(); diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs index 738a1d23b7..0456fc1d87 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedPartitionKeyResultSetIteratorCore.cs @@ -85,11 +85,6 @@ private ChangeFeedPartitionKeyResultSetIteratorCore( public override bool HasMoreResults => this.hasMoreResultsInternal; - public override CosmosElement GetCosmosElementContinuationToken() - { - throw new NotImplementedException(); - } - /// /// Get the next set of results from the cosmos service /// diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs index ed6f6017dd..e9cbcfbdfd 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/StandByFeedIteratorCore.cs @@ -207,10 +207,5 @@ internal virtual Task NextResultSetDelegateAsync( trace: trace, cancellationToken: cancellationToken); } - - public override CosmosElement GetCosmosElementContinuationToken() - { - throw new NotImplementedException(); - } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs b/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs index e9d8da0ab5..48cadec045 100644 --- a/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs @@ -265,8 +265,6 @@ public override async Task ReadNextAsync(ITrace trace, Cancella trace: trace); } - public override CosmosElement GetCosmosElementContinuationToken() => this.queryPipelineStage.Current.Result.State?.Value; - protected override void Dispose(bool disposing) { this.queryPipelineStage.DisposeAsync(); diff --git a/Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs index 1e997d0b8c..9bc8c10914 100644 --- a/Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/ReadFeed/ReadFeedIteratorCore.cs @@ -350,10 +350,5 @@ public override async Task ReadNextAsync( Content = page.Content, }; } - - public override CosmosElement GetCosmosElementContinuationToken() - { - throw new NotSupportedException(); - } } } diff --git a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorCore.cs index 13d0ba2336..9b630c3859 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorCore.cs @@ -131,11 +131,6 @@ public override async Task ReadNextAsync( return responseMessage; } - public override CosmosElement GetCosmosElementContinuationToken() - { - throw new NotImplementedException(); - } - private static async Task RewriteStreamAsTextAsync(ResponseMessage responseMessage, QueryRequestOptions requestOptions, ITrace trace) { using (ITrace rewriteTrace = trace.StartChild("Rewrite Stream as Text", TraceComponent.Json, TraceLevel.Info)) @@ -221,11 +216,6 @@ internal FeedIteratorCore( public override bool HasMoreResults => this.feedIterator.HasMoreResults; - public override CosmosElement GetCosmosElementContinuationToken() - { - return this.feedIterator.GetCosmosElementContinuationToken(); - } - /// /// Get the next set of results from the cosmos service /// diff --git a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore.cs index 674f3adbea..87d0de1ef5 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore.cs @@ -44,11 +44,6 @@ internal FeedIteratorInlineCore( public override bool HasMoreResults => this.feedIteratorInternal.HasMoreResults; - public override CosmosElement GetCosmosElementContinuationToken() - { - return this.feedIteratorInternal.GetCosmosElementContinuationToken(); - } - public override Task ReadNextAsync(CancellationToken cancellationToken = default) { return this.clientContext.OperationHelperAsync( diff --git a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore{T}.cs b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore{T}.cs index cebfadb4c4..010dc6569a 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore{T}.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInlineCore{T}.cs @@ -61,11 +61,6 @@ public override Task> ReadNextAsync(ITrace trace, CancellationTo return TaskHelper.RunInlineIfNeededAsync(() => this.feedIteratorInternal.ReadNextAsync(trace, cancellationToken)); } - public override CosmosElement GetCosmosElementContinuationToken() - { - return this.feedIteratorInternal.GetCosmosElementContinuationToken(); - } - protected override void Dispose(bool disposing) { this.feedIteratorInternal.Dispose(); diff --git a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal.cs b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal.cs index 8dac3506da..e4b53fc8f5 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal.cs @@ -21,8 +21,6 @@ namespace Microsoft.Azure.Cosmos #endif abstract class FeedIteratorInternal : FeedIterator { - public abstract CosmosElement GetCosmosElementContinuationToken(); - public static bool IsRetriableException(CosmosException cosmosException) { return ((int)cosmosException.StatusCode == 429) diff --git a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal{T}.cs b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal{T}.cs index afe0a7b20a..2d26e38b67 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal{T}.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedIteratorInternal{T}.cs @@ -35,7 +35,5 @@ private async Task> ReadNextWithRootTraceAsync(CancellationToken } public abstract Task> ReadNextAsync(ITrace trace, CancellationToken cancellationToken); - - public abstract CosmosElement GetCosmosElementContinuationToken(); } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/AggregateQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/AggregateQueryTests.cs index ce31558a84..32a75eb9e4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/AggregateQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/AggregateQueryTests.cs @@ -1188,7 +1188,7 @@ async static Task ImplementationAsync(Container container, IReadOnlyList d MaxConcurrency = 10, MaxItemCount = 100, }, - QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.HoldState); documentsFromWithoutDistinct = documentsFromWithoutDistinct .Where(document => documentsSeen.Add(document, out UInt128 hash)) .ToList(); @@ -241,7 +241,7 @@ async Task ImplemenationAsync(Container container, IReadOnlyList d MaxConcurrency = 10, MaxItemCount = pageSize }, - QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.HoldState); string[] expectedDocuments = documentsFromWithoutDistinct.Select(x => x.ToString()).ToArray(); string[] actualDocuments = documentsFromWithDistinct.Select(x => x.ToString()).ToArray(); @@ -262,7 +262,7 @@ async Task ImplemenationAsync(Container container, IReadOnlyList d MaxConcurrency = 10, MaxItemCount = pageSize }, - QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.HoldState); Assert.AreEqual(1, documentsWithDCount.Count); long dcount = Number64.ToLong((documentsWithDCount.First() as CosmosNumber).Value); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/GroupByQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/GroupByQueryTests.cs index 0287b2d0d4..fc8e56bb04 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/GroupByQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/GroupByQueryTests.cs @@ -514,18 +514,6 @@ FROM c queryRequestOptions); HashSet actualWithoutContinuationTokensSet = new HashSet(actualWithoutContinuationTokens); - List actualWithTryGetContinuationTokens = await QueryTestsBase.QueryWithCosmosElementContinuationTokenAsync( - container, - query, - queryRequestOptions); - HashSet actualWithTryGetContinuationTokensSet = new HashSet(actualWithTryGetContinuationTokens); - - Assert.IsTrue( - actualWithoutContinuationTokensSet.SetEquals(actualWithTryGetContinuationTokensSet), - $"Results did not match for query: {query} with maxItemCount: {maxItemCount}" + - $"ActualWithoutContinuationTokens: {JsonConvert.SerializeObject(actualWithoutContinuationTokensSet)}" + - $"ActualWithTryGetContinuationTokens: {JsonConvert.SerializeObject(actualWithTryGetContinuationTokensSet)}"); - HashSet expectedSet = new HashSet(expectedResults); Assert.IsTrue( @@ -583,27 +571,14 @@ FROM c this.NormalizeGroupByArrayAggregateResults(actualWithoutContinuationTokens); HashSet actualWithoutContinuationTokensSet = new HashSet(actualWithoutContinuationTokens); - List actualWithTryGetContinuationTokens = await QueryTestsBase.QueryWithCosmosElementContinuationTokenAsync( - container, - query, - queryRequestOptions); - this.NormalizeGroupByArrayAggregateResults(actualWithTryGetContinuationTokens); - HashSet actualWithTryGetContinuationTokensSet = new HashSet(actualWithTryGetContinuationTokens); - List actualWithCombinations = await QueryTestsBase.RunQueryCombinationsAsync( container, query, queryRequestOptions, - QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.HoldState); this.NormalizeGroupByArrayAggregateResults(actualWithCombinations); HashSet actualWithCombinationsSet = new HashSet(actualWithCombinations); - Assert.IsTrue( - actualWithoutContinuationTokensSet.SetEquals(actualWithTryGetContinuationTokensSet), - $"Results did not match for query: {query} with maxItemCount: {maxItemCount}" + - $"ActualWithoutContinuationTokens: {JsonConvert.SerializeObject(actualWithoutContinuationTokensSet)}" + - $"ActualWithTryGetContinuationTokens: {JsonConvert.SerializeObject(actualWithTryGetContinuationTokensSet)}"); - Assert.IsTrue( actualWithoutContinuationTokensSet.SetEquals(actualWithCombinationsSet), $"Results did not match for query: {query} with maxItemCount: {maxItemCount}" + diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs index 1c20a3bb0e..99048cd125 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/QueryTestsBase.cs @@ -558,74 +558,6 @@ internal CosmosClient CreateNewCosmosClient(ConnectionMode connectionMode) }; } - internal static async Task> QueryWithCosmosElementContinuationTokenAsync( - Container container, - string query, - QueryRequestOptions queryRequestOptions = null) - { - if (queryRequestOptions == null) - { - queryRequestOptions = new QueryRequestOptions(); - } - - List resultsFromCosmosElementContinuationToken = new List(); - CosmosElement continuationToken = null; - do - { - QueryRequestOptions computeRequestOptions = new QueryRequestOptions - { - IfMatchEtag = queryRequestOptions.IfMatchEtag, - IfNoneMatchEtag = queryRequestOptions.IfNoneMatchEtag, - MaxItemCount = queryRequestOptions.MaxItemCount, - ResponseContinuationTokenLimitInKb = queryRequestOptions.ResponseContinuationTokenLimitInKb, - EnableScanInQuery = queryRequestOptions.EnableScanInQuery, - EnableLowPrecisionOrderBy = queryRequestOptions.EnableLowPrecisionOrderBy, - MaxBufferedItemCount = queryRequestOptions.MaxBufferedItemCount, - SessionToken = queryRequestOptions.SessionToken, - ConsistencyLevel = queryRequestOptions.ConsistencyLevel, - MaxConcurrency = queryRequestOptions.MaxConcurrency, - PartitionKey = queryRequestOptions.PartitionKey, - CosmosSerializationFormatOptions = queryRequestOptions.CosmosSerializationFormatOptions, - Properties = queryRequestOptions.Properties, - IsEffectivePartitionKeyRouting = queryRequestOptions.IsEffectivePartitionKeyRouting, - CosmosElementContinuationToken = queryRequestOptions.CosmosElementContinuationToken, - EnableOptimisticDirectExecution = queryRequestOptions.EnableOptimisticDirectExecution, - TestSettings = queryRequestOptions.TestSettings, - }; - - computeRequestOptions.CosmosElementContinuationToken = continuationToken; - - using (FeedIteratorInternal itemQuery = (FeedIteratorInternal)container.GetItemQueryIterator( - queryText: query, - requestOptions: computeRequestOptions)) - { - try - { - FeedResponse cosmosQueryResponse = await itemQuery.ReadNextAsync(); - if (queryRequestOptions.MaxItemCount.HasValue) - { - Assert.IsTrue( - cosmosQueryResponse.Count <= queryRequestOptions.MaxItemCount.Value, - $"Max Item Count is not being honored. Got {cosmosQueryResponse.Count} documents when {queryRequestOptions.MaxItemCount.Value} is the max."); - } - - resultsFromCosmosElementContinuationToken.AddRange(cosmosQueryResponse); - - // Force a rewrite of the continuation token, so that we test the case where we roundtrip it over the wire. - // There was a bug where resuming from double.NaN lead to an exception, - // since we parsed the type assuming it was always a double and not a string. - CosmosElement originalContinuationToken = itemQuery.GetCosmosElementContinuationToken(); - continuationToken = originalContinuationToken != null ? CosmosElement.Parse(originalContinuationToken.ToString()) : null; - } - catch (CosmosException cosmosException) when (cosmosException.StatusCode == (HttpStatusCode)429) - { - } - } - } while (continuationToken != null); - - return resultsFromCosmosElementContinuationToken; - } - internal static async Task> QueryWithContinuationTokensAsync( Container container, string query, @@ -779,7 +711,7 @@ internal static Task> RunQueryAsync( container, query, queryRequestOptions, - QueryDrainingMode.ContinuationToken | QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.ContinuationToken | QueryDrainingMode.HoldState); } [Flags] @@ -788,7 +720,6 @@ public enum QueryDrainingMode None = 0, HoldState = 1, ContinuationToken = 2, - CosmosElementContinuationToken = 4, } internal static Task> RunQueryCombinationsAsync( @@ -833,16 +764,6 @@ internal static async Task> RunQueryCombinationsAsync( queryExecutionResults[QueryDrainingMode.ContinuationToken] = queryResultsWithContinuationTokens; } - if (queryDrainingMode.HasFlag(QueryDrainingMode.CosmosElementContinuationToken)) - { - List queryResultsWithCosmosElementContinuationToken = await QueryWithCosmosElementContinuationTokenAsync( - container, - query, - queryRequestOptions); - - queryExecutionResults[QueryDrainingMode.CosmosElementContinuationToken] = queryResultsWithCosmosElementContinuationToken; - } - foreach (QueryDrainingMode queryDrainingMode1 in queryExecutionResults.Keys) { foreach (QueryDrainingMode queryDrainingMode2 in queryExecutionResults.Keys) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/SanityQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/SanityQueryTests.cs index 4a5f02ff7c..f28ae7a98b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/SanityQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Query/SanityQueryTests.cs @@ -352,7 +352,6 @@ async Task ValidateNonDeterministicQuery( await ValidateNonDeterministicQuery(QueryTestsBase.QueryWithoutContinuationTokensAsync, useOrderBy); await ValidateNonDeterministicQuery(QueryTestsBase.QueryWithContinuationTokensAsync, useOrderBy); - await ValidateNonDeterministicQuery(QueryTestsBase.QueryWithCosmosElementContinuationTokenAsync, useOrderBy); } } } @@ -732,7 +731,7 @@ async Task> AssertPassthroughAsync(string query, Cosmos.Part containerWithForcedPlan, query, feedOptions, - QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.HoldState); Assert.IsTrue(feedOptions.TestSettings.Stats.PipelineType.HasValue); Assert.AreEqual(TestInjections.PipelineType.Passthrough, feedOptions.TestSettings.Stats.PipelineType.Value); @@ -768,7 +767,7 @@ async Task> AssertSpecializedAsync(string query, Cosmos.Part containerWithForcedPlan, query, feedOptions, - QueryDrainingMode.HoldState | QueryDrainingMode.CosmosElementContinuationToken); + QueryDrainingMode.HoldState); Assert.IsTrue(feedOptions.TestSettings.Stats.PipelineType.HasValue); Assert.AreEqual(TestInjections.PipelineType.Specialized, feedOptions.TestSettings.Stats.PipelineType.Value);