diff --git a/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs b/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs index 4f2edb99cf..fb6f1580d1 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs @@ -45,12 +45,6 @@ public static CosmosElement CreateFromBuffer(ReadOnlyMemory buffer) return CosmosElement.Dispatch(jsonNavigator, jsonNavigatorNode); } - public static CosmosElement Parse(string json) - { - byte[] buffer = Encoding.UTF8.GetBytes(json); - return CosmosElement.CreateFromBuffer(buffer); - } - public static CosmosElement Dispatch( IJsonNavigator jsonNavigator, IJsonNavigatorNode jsonNavigatorNode) @@ -131,43 +125,55 @@ public static CosmosElement Dispatch( return item; } - public static bool TryParse(string serializedCosmosElement, out CosmosElement cosmosElement) + public static CosmosElement Parse(string json) + { + if (!CosmosElement.TryParse(json, out CosmosElement cosmosElement)) + { + throw new ArgumentException($"Failed to parse json: {json}."); + } + + return cosmosElement; + } + + public static bool TryParse( + string serializedCosmosElement, + out CosmosElement cosmosElement) { - cosmosElement = default(CosmosElement); if (serializedCosmosElement == null) { throw new ArgumentNullException(nameof(serializedCosmosElement)); } - byte[] buffer = Encoding.UTF8.GetBytes(serializedCosmosElement); try { + byte[] buffer = Encoding.UTF8.GetBytes(serializedCosmosElement); cosmosElement = CosmosElement.CreateFromBuffer(buffer); - return true; } catch (JsonParseException) { - cosmosElement = default(CosmosElement); - return false; + cosmosElement = default; } + + return cosmosElement != default; } - public static bool TryParse(string serializedCosmosElement, out TCosmosElement cosmosElement) - where TCosmosElement : CosmosElement + public static bool TryParse( + string serializedCosmosElement, + out TCosmosElement typedCosmosElement) { - if (!CosmosElement.TryParse(serializedCosmosElement, out CosmosElement rawCosmosElement)) + if (!CosmosElement.TryParse(serializedCosmosElement, out CosmosElement cosmosElement)) { - cosmosElement = default(TCosmosElement); + typedCosmosElement = default; return false; } - if (!(rawCosmosElement is TCosmosElement typedCosmosElement)) + if (!(cosmosElement is TCosmosElement tempCosmosElement)) { - cosmosElement = default(TCosmosElement); + typedCosmosElement = default; return false; } - cosmosElement = typedCosmosElement; + typedCosmosElement = tempCosmosElement; return true; } } diff --git a/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosObject.EagerCosmosObject.cs b/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosObject.EagerCosmosObject.cs index f5362fb6a0..5c3f016a6f 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosObject.EagerCosmosObject.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosElements/CosmosObject.EagerCosmosObject.cs @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.CosmosElements using System; using System.Collections.Generic; using Microsoft.Azure.Cosmos.Json; - using Newtonsoft.Json; #if INTERNAL #pragma warning disable CS1591 // Missing XML comment for publicly visible type or member @@ -48,11 +47,6 @@ public override bool TryGetValue(string key, out CosmosElement value) return this.dictionary.TryGetValue(key, out value); } - public override string ToString() - { - return JsonConvert.SerializeObject(this.dictionary); - } - public override void WriteTo(IJsonWriter jsonWriter) { if (jsonWriter == null) diff --git a/Microsoft.Azure.Cosmos/src/Diagnostics/QueryAggregateDiagnostics.cs b/Microsoft.Azure.Cosmos/src/Diagnostics/QueryAggregateDiagnostics.cs index 97d5634d34..bce974a240 100644 --- a/Microsoft.Azure.Cosmos/src/Diagnostics/QueryAggregateDiagnostics.cs +++ b/Microsoft.Azure.Cosmos/src/Diagnostics/QueryAggregateDiagnostics.cs @@ -5,11 +5,8 @@ namespace Microsoft.Azure.Cosmos.Query { using System; - using System.Collections; using System.Collections.Generic; - using System.Linq; using System.Text; - using Newtonsoft.Json; internal sealed class QueryAggregateDiagnostics : CosmosDiagnostics { @@ -26,7 +23,7 @@ public QueryAggregateDiagnostics( this.Pages = pages; } - private IReadOnlyCollection Pages { get; } + public IReadOnlyCollection Pages { get; } public override string ToString() { diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/AverageAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/AverageAggregator.cs index 90f3727178..1df36faecf 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/AverageAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/AverageAggregator.cs @@ -18,7 +18,12 @@ internal sealed class AverageAggregator : IAggregator /// /// The running weighted average for this aggregator. /// - private AverageInfo globalAverage = new AverageInfo(0, 0); + private AverageInfo globalAverage; + + private AverageAggregator(AverageInfo globalAverage) + { + this.globalAverage = globalAverage; + } /// /// Averages the supplied item with the previously supplied items. @@ -40,6 +45,29 @@ public CosmosElement GetResult() return this.globalAverage.GetAverage(); } + public string GetContinuationToken() + { + return this.globalAverage.ToString(); + } + + public static AverageAggregator Create(string continuationToken) + { + AverageInfo averageInfo; + if (continuationToken != null) + { + if (!AverageInfo.TryParse(continuationToken, out averageInfo)) + { + throw new ArgumentException($"Invalid continuation token: {continuationToken}"); + } + } + else + { + averageInfo = new AverageInfo(0, 0); + } + + return new AverageAggregator(averageInfo); + } + /// /// Struct that stores a weighted average as a sum and count so they that average across different partitions with different numbers of documents can be taken. /// @@ -166,6 +194,31 @@ public CosmosNumber GetAverage() return CosmosNumber64.Create(this.Sum.Value / this.Count); } + + public override string ToString() + { + return $@"{{ + {(this.Sum.HasValue ? $@"""{SumName}"" : {this.Sum.Value}," : string.Empty)} + ""{CountName}"" : {this.Count} + }}"; + } + + public static bool TryParse(string serializedAverageInfo, out AverageInfo averageInfo) + { + if (serializedAverageInfo == null) + { + throw new ArgumentNullException(nameof(serializedAverageInfo)); + } + + if (!CosmosElement.TryParse(serializedAverageInfo, out CosmosElement cosmosElementAverageInfo)) + { + averageInfo = default(AverageInfo); + return false; + } + + averageInfo = AverageInfo.Create(cosmosElementAverageInfo); + return true; + } } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/CountAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/CountAggregator.cs index 718f818227..979d2c5496 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/CountAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/CountAggregator.cs @@ -4,6 +4,8 @@ namespace Microsoft.Azure.Cosmos.Query.Aggregation { using System; + using System.Globalization; + using System.Net; using Microsoft.Azure.Cosmos.CosmosElements; /// @@ -18,6 +20,16 @@ internal sealed class CountAggregator : IAggregator /// private long globalCount; + private CountAggregator(long initialCount) + { + if (initialCount < 0) + { + throw new ArgumentOutOfRangeException(nameof(initialCount)); + } + + this.globalCount = initialCount; + } + /// /// Adds a count to the running count. /// @@ -47,5 +59,30 @@ public CosmosElement GetResult() { return CosmosNumber64.Create(this.globalCount); } + + public string GetContinuationToken() + { + return this.globalCount.ToString(CultureInfo.InvariantCulture); + } + + public static CountAggregator Create(string continuationToken) + { + long partialCount; + if (continuationToken != null) + { + if (!long.TryParse(continuationToken, out partialCount)) + { + throw new CosmosException( + HttpStatusCode.BadRequest, + $@"Invalid count continuation token: ""{continuationToken}""."); + } + } + else + { + partialCount = 0; + } + + return new CountAggregator(initialCount: partialCount); + } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/IAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/IAggregator.cs index 662156df5d..9f2571724c 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/IAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/IAggregator.cs @@ -21,5 +21,11 @@ internal interface IAggregator /// /// The result of the aggregation. CosmosElement GetResult(); + + /// + /// Gets a continuation token that stores the partial aggregate up till this point. + /// + /// A continuation token that stores the partial aggregate up till this point. + string GetContinuationToken(); } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/MinMaxAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/MinMaxAggregator.cs index 1363a178b9..a3784e60a1 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/MinMaxAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/MinMaxAggregator.cs @@ -13,6 +13,10 @@ namespace Microsoft.Azure.Cosmos.Query.Aggregation /// internal sealed class MinMaxAggregator : IAggregator { + private const string MinValueContinuationToken = "MIN_VALUE"; + private const string MaxValueContinuationToken = "MAX_VALUE"; + private const string UndefinedContinuationToken = "UNDEFINED"; + private static readonly CosmosElement Undefined = null; /// /// Whether or not the aggregation is a min or a max. @@ -24,17 +28,10 @@ internal sealed class MinMaxAggregator : IAggregator /// private CosmosElement globalMinMax; - public MinMaxAggregator(bool isMinAggregation) + private MinMaxAggregator(bool isMinAggregation, CosmosElement globalMinMax) { this.isMinAggregation = isMinAggregation; - if (this.isMinAggregation) - { - globalMinMax = ItemComparer.MaxValue; - } - else - { - globalMinMax = ItemComparer.MinValue; - } + this.globalMinMax = globalMinMax; } public void Aggregate(CosmosElement localMinMax) @@ -98,7 +95,7 @@ public void Aggregate(CosmosElement localMinMax) if (!ItemComparer.IsMinOrMax(this.globalMinMax) && (!CosmosElementIsPrimitive(localMinMax) || !CosmosElementIsPrimitive(this.globalMinMax))) { - // This means we are comparing non primitives with is undefined + // This means we are comparing non primitives which is undefined this.globalMinMax = Undefined; return; } @@ -136,6 +133,72 @@ public CosmosElement GetResult() return result; } + public string GetContinuationToken() + { + string continuationToken; + if (this.globalMinMax == ItemComparer.MinValue) + { + continuationToken = MinMaxAggregator.MinValueContinuationToken; + } + else if (this.globalMinMax == ItemComparer.MaxValue) + { + continuationToken = MinMaxAggregator.MaxValueContinuationToken; + } + else if (this.globalMinMax == Undefined) + { + continuationToken = MinMaxAggregator.UndefinedContinuationToken; + } + else + { + continuationToken = this.globalMinMax.ToString(); + } + + return continuationToken; + } + + public static MinMaxAggregator CreateMinAggregator(string continuationToken) + { + return MinMaxAggregator.Create(isMinAggregation: true, continuationToken: continuationToken); + } + + public static MinMaxAggregator CreateMaxAggregator(string continuationToken) + { + return MinMaxAggregator.Create(isMinAggregation: false, continuationToken: continuationToken); + } + + private static MinMaxAggregator Create(bool isMinAggregation, string continuationToken) + { + CosmosElement globalMinMax; + if (continuationToken != null) + { + if (continuationToken == MinMaxAggregator.MaxValueContinuationToken) + { + globalMinMax = ItemComparer.MaxValue; + } + else if (continuationToken == MinMaxAggregator.MinValueContinuationToken) + { + globalMinMax = ItemComparer.MinValue; + } + else if (continuationToken == MinMaxAggregator.UndefinedContinuationToken) + { + globalMinMax = MinMaxAggregator.Undefined; + } + else + { + if (!CosmosElement.TryParse(continuationToken, out globalMinMax)) + { + throw new ArgumentException($"Malformed continuation token: {continuationToken}"); + } + } + } + else + { + globalMinMax = isMinAggregation ? (CosmosElement)ItemComparer.MaxValue : (CosmosElement)ItemComparer.MinValue; + } + + return new MinMaxAggregator(isMinAggregation: isMinAggregation, globalMinMax: globalMinMax); + } + private static bool CosmosElementIsPrimitive(CosmosElement cosmosElement) { if (cosmosElement == null) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SingleGroupAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SingleGroupAggregator.cs index fa9e2a871f..5a4cb61ef9 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SingleGroupAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SingleGroupAggregator.cs @@ -26,10 +26,14 @@ internal abstract class SingleGroupAggregator /// public abstract CosmosElement GetResult(); + public abstract string GetContinuationToken(); + public static SingleGroupAggregator Create( + CosmosQueryClient queryClient, AggregateOperator[] aggregates, IReadOnlyDictionary aggregateAliasToAggregateType, - bool hasSelectValue) + bool hasSelectValue, + string continuationToken) { SingleGroupAggregator aggregateValues; if (hasSelectValue) @@ -37,17 +41,17 @@ public static SingleGroupAggregator Create( if (aggregates != null && aggregates.Any()) { // SELECT VALUE - aggregateValues = SelectValueAggregateValues.Create(aggregates[0]); + aggregateValues = SelectValueAggregateValues.Create(aggregates[0], continuationToken); } else { // SELECT VALUE - aggregateValues = SelectValueAggregateValues.Create(aggregateOperator: null); + aggregateValues = SelectValueAggregateValues.Create(aggregateOperator: null, continuationToken: null); } } else { - aggregateValues = SelectListAggregateValues.Create(aggregateAliasToAggregateType); + aggregateValues = SelectListAggregateValues.Create(queryClient, aggregateAliasToAggregateType, continuationToken); } return aggregateValues; @@ -71,9 +75,9 @@ private SelectValueAggregateValues(AggregateValue aggregateValue) this.aggregateValue = aggregateValue; } - public static SelectValueAggregateValues Create(AggregateOperator? aggregateOperator) + public static SelectValueAggregateValues Create(AggregateOperator? aggregateOperator, string continuationToken) { - AggregateValue aggregateValue = AggregateValue.Create(aggregateOperator); + AggregateValue aggregateValue = AggregateValue.Create(aggregateOperator, continuationToken); return new SelectValueAggregateValues(aggregateValue); } @@ -87,6 +91,11 @@ public override CosmosElement GetResult() return this.aggregateValue.Result; } + public override string GetContinuationToken() + { + return this.aggregateValue.GetContinuationToken(); + } + public override string ToString() { return this.aggregateValue.ToString(); @@ -123,14 +132,59 @@ public override CosmosElement GetResult() return CosmosObject.Create(aliasToElement); } - public static SelectListAggregateValues Create(IReadOnlyDictionary aggregateAliasToAggregateType) + public override string GetContinuationToken() + { + Dictionary aliasToContinuationToken = new Dictionary(); + foreach (KeyValuePair kvp in this.aliasToValue) + { + aliasToContinuationToken[kvp.Key] = CosmosString.Create(kvp.Value.GetContinuationToken()); + } + + CosmosObject cosmosObject = CosmosObject.Create(aliasToContinuationToken); + return cosmosObject.ToString(); + } + + public static SelectListAggregateValues Create( + CosmosQueryClient cosmosQueryClient, + IReadOnlyDictionary aggregateAliasToAggregateType, + string continuationToken) { + CosmosObject aliasToContinuationToken; + if (continuationToken != null) + { + if (!CosmosElement.TryParse(continuationToken, out aliasToContinuationToken)) + { + throw cosmosQueryClient.CreateBadRequestException( + $"{nameof(SelectListAggregateValues)} continuation token is malformed: {continuationToken}."); + } + } + else + { + aliasToContinuationToken = null; + } + Dictionary groupingTable = new Dictionary(); foreach (KeyValuePair aliasToAggregate in aggregateAliasToAggregateType) { string alias = aliasToAggregate.Key; AggregateOperator? aggregateOperator = aliasToAggregate.Value; - groupingTable[alias] = AggregateValue.Create(aggregateOperator); + string aliasContinuationToken; + if (aliasToContinuationToken != null) + { + if (!(aliasToContinuationToken[alias] is CosmosString parsedAliasContinuationToken)) + { + throw cosmosQueryClient.CreateBadRequestException( + $"{nameof(SelectListAggregateValues)} continuation token is malformed: {continuationToken}."); + } + + aliasContinuationToken = parsedAliasContinuationToken.Value; + } + else + { + aliasContinuationToken = null; + } + + groupingTable[alias] = AggregateValue.Create(aggregateOperator, aliasContinuationToken); } return new SelectListAggregateValues(groupingTable); @@ -168,21 +222,23 @@ private abstract class AggregateValue public abstract CosmosElement Result { get; } + public abstract string GetContinuationToken(); + public override string ToString() { return this.Result.ToString(); } - public static AggregateValue Create(AggregateOperator? aggregateOperator) + public static AggregateValue Create(AggregateOperator? aggregateOperator, string continuationToken) { AggregateValue value; if (aggregateOperator.HasValue) { - value = AggregateAggregateValue.Create(aggregateOperator.Value); + value = AggregateAggregateValue.Create(aggregateOperator.Value, continuationToken); } else { - value = ScalarAggregateValue.Create(); + value = ScalarAggregateValue.Create(continuationToken); } return value; @@ -210,29 +266,36 @@ public override void AddValue(CosmosElement aggregateValue) this.aggregator.Aggregate(aggregateItem.Item); } - public static AggregateAggregateValue Create(AggregateOperator aggregateOperator) + public override string GetContinuationToken() + { + return this.aggregator.GetContinuationToken(); + } + + public static AggregateAggregateValue Create( + AggregateOperator aggregateOperator, + string continuationToken) { IAggregator aggregator; switch (aggregateOperator) { case AggregateOperator.Average: - aggregator = new AverageAggregator(); + aggregator = AverageAggregator.Create(continuationToken); break; case AggregateOperator.Count: - aggregator = new CountAggregator(); + aggregator = CountAggregator.Create(continuationToken); break; case AggregateOperator.Max: - aggregator = new MinMaxAggregator(isMinAggregation: false); + aggregator = MinMaxAggregator.CreateMaxAggregator(continuationToken); break; case AggregateOperator.Min: - aggregator = new MinMaxAggregator(isMinAggregation: true); + aggregator = MinMaxAggregator.CreateMinAggregator(continuationToken); break; case AggregateOperator.Sum: - aggregator = new SumAggregator(); + aggregator = SumAggregator.Create(continuationToken); break; default: @@ -248,10 +311,10 @@ private sealed class ScalarAggregateValue : AggregateValue private CosmosElement value; private bool initialized; - private ScalarAggregateValue() + private ScalarAggregateValue(CosmosElement initialValue, bool initialized) { - this.value = null; - this.initialized = false; + this.value = initialValue; + this.initialized = initialized; } public override CosmosElement Result @@ -267,9 +330,30 @@ public override CosmosElement Result } } - public static ScalarAggregateValue Create() + public override string GetContinuationToken() { - return new ScalarAggregateValue(); + return this.value.ToString(); + } + + public static ScalarAggregateValue Create(string continuationToken) + { + CosmosElement initialValue; + bool initialized; + if (continuationToken != null) + { + if (!CosmosElement.TryParse(continuationToken, out initialValue)) + { + throw new ArgumentException($"Invalid {nameof(ScalarAggregateValue)}: {continuationToken}"); + } + initialized = true; + } + else + { + initialValue = null; + initialized = false; + } + + return new ScalarAggregateValue(initialValue, initialized); } public override void AddValue(CosmosElement aggregateValue) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SumAggregator.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SumAggregator.cs index dff5dcd25b..0e3b416d6f 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SumAggregator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Aggregation/SumAggregator.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Cosmos.Query.Aggregation { using System; + using System.Globalization; using Microsoft.Azure.Cosmos.CosmosElements; /// @@ -18,6 +19,11 @@ internal sealed class SumAggregator : IAggregator /// private double globalSum; + private SumAggregator(double globalSum) + { + this.globalSum = globalSum; + } + /// /// Adds a local sum to the global sum. /// @@ -60,5 +66,25 @@ public CosmosElement GetResult() return CosmosNumber64.Create(this.globalSum); } + + public string GetContinuationToken() + { + return this.globalSum.ToString("G17", CultureInfo.InvariantCulture); + } + + public static SumAggregator Create(string continuationToken) + { + double partialSum; + if (continuationToken != null) + { + partialSum = double.Parse(continuationToken, CultureInfo.InvariantCulture); + } + else + { + partialSum = 0.0; + } + + return new SumAggregator(partialSum); + } } } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.Client.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.Client.cs new file mode 100644 index 0000000000..9a1b06aa49 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.Client.cs @@ -0,0 +1,109 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.CosmosElements; + + internal abstract partial class AggregateDocumentQueryExecutionComponent : DocumentQueryExecutionComponentBase + { + private sealed class ClientAggregateDocumentQueryExecutionComponent : AggregateDocumentQueryExecutionComponent + { + private ClientAggregateDocumentQueryExecutionComponent( + IDocumentQueryExecutionComponent source, + SingleGroupAggregator singleGroupAggregator, + bool isValueAggregateQuery) + : base(source, singleGroupAggregator, isValueAggregateQuery) + { + // all the work is done in the base constructor. + } + + public static async Task CreateAsync( + CosmosQueryClient queryClient, + AggregateOperator[] aggregates, + IReadOnlyDictionary aliasToAggregateType, + bool hasSelectValue, + string requestContinuation, + Func> createSourceCallback) + { + IDocumentQueryExecutionComponent source = await createSourceCallback(requestContinuation); + SingleGroupAggregator singleGroupAggregator = SingleGroupAggregator.Create( + queryClient, + aggregates, + aliasToAggregateType, + hasSelectValue, + continuationToken: null); + + return new ClientAggregateDocumentQueryExecutionComponent( + source, + singleGroupAggregator, + hasSelectValue); + } + + public override async Task DrainAsync( + int maxElements, + CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + // Note-2016-10-25-felixfan: Given what we support now, we should expect to return only 1 document. + // Note-2019-07-11-brchon: We can return empty pages until all the documents are drained, + // but then we will have to design a continuation token. + + double requestCharge = 0; + long responseLengthBytes = 0; + List diagnosticsPages = new List(); + while (!this.Source.IsDone) + { + QueryResponseCore sourceResponse = await this.Source.DrainAsync(int.MaxValue, cancellationToken); + if (!sourceResponse.IsSuccess) + { + return sourceResponse; + } + + requestCharge += sourceResponse.RequestCharge; + responseLengthBytes += sourceResponse.ResponseLengthBytes; + if (sourceResponse.Diagnostics != null) + { + diagnosticsPages.AddRange(sourceResponse.Diagnostics); + } + + foreach (CosmosElement element in sourceResponse.CosmosElements) + { + RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections( + this.isValueAggregateQuery, + element); + this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload); + } + } + + List finalResult = new List(); + CosmosElement aggregationResult = this.singleGroupAggregator.GetResult(); + if (aggregationResult != null) + { + finalResult.Add(aggregationResult); + } + + return QueryResponseCore.CreateSuccess( + result: finalResult, + continuationToken: null, + activityId: null, + disallowContinuationTokenMessage: null, + requestCharge: requestCharge, + diagnostics: diagnosticsPages, + responseLengthBytes: responseLengthBytes); + } + + public override bool TryGetContinuationToken(out string state) + { + // Since we block until we get the final result the continuation token is always null. + state = null; + return true; + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.Compute.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.Compute.cs new file mode 100644 index 0000000000..1342838c03 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.Compute.cs @@ -0,0 +1,285 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.CosmosElements; + + internal abstract partial class AggregateDocumentQueryExecutionComponent : DocumentQueryExecutionComponentBase + { + private static readonly IReadOnlyList EmptyResults = new List().AsReadOnly(); + + private sealed class ComputeAggregateDocumentQueryExecutionComponent : AggregateDocumentQueryExecutionComponent + { + private ComputeAggregateDocumentQueryExecutionComponent( + IDocumentQueryExecutionComponent source, + SingleGroupAggregator singleGroupAggregator, + bool isValueAggregateQuery) + : base(source, singleGroupAggregator, isValueAggregateQuery) + { + // all the work is done in the base constructor. + } + + public static async Task CreateAsync( + CosmosQueryClient queryClient, + AggregateOperator[] aggregates, + IReadOnlyDictionary aliasToAggregateType, + bool hasSelectValue, + string requestContinuation, + Func> createSourceCallback) + { + string sourceContinuationToken; + string singleGroupAggregatorContinuationToken; + if (requestContinuation != null) + { + if (!AggregateContinuationToken.TryParse(requestContinuation, out AggregateContinuationToken aggregateContinuationToken)) + { + throw queryClient.CreateBadRequestException($"Malfomed {nameof(AggregateContinuationToken)}: '{requestContinuation}'"); + } + + sourceContinuationToken = aggregateContinuationToken.SourceContinuationToken; + singleGroupAggregatorContinuationToken = aggregateContinuationToken.SingleGroupAggregatorContinuationToken; + } + else + { + sourceContinuationToken = null; + singleGroupAggregatorContinuationToken = null; + } + + IDocumentQueryExecutionComponent source = await createSourceCallback(sourceContinuationToken); + SingleGroupAggregator singleGroupAggregator = SingleGroupAggregator.Create( + queryClient, + aggregates, + aliasToAggregateType, + hasSelectValue, + singleGroupAggregatorContinuationToken); + + return new ComputeAggregateDocumentQueryExecutionComponent( + source, + singleGroupAggregator, + hasSelectValue); + } + + public override async Task DrainAsync( + int maxElements, + CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + // Draining aggregates is broken down into two stages + QueryResponseCore response; + if (!this.Source.IsDone) + { + // Stage 1: + // Drain the aggregates fully from all continuations and all partitions + // And return empty pages in the meantime. + QueryResponseCore sourceResponse = await this.Source.DrainAsync(int.MaxValue, cancellationToken); + if (!sourceResponse.IsSuccess) + { + return sourceResponse; + } + + foreach (CosmosElement element in sourceResponse.CosmosElements) + { + RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections( + this.isValueAggregateQuery, + element); + this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload); + } + + if (this.Source.IsDone) + { + response = this.GetFinalResponse(); + } + else + { + response = this.GetEmptyPage(sourceResponse); + } + } + else + { + // Stage 2: + // Return the final page after draining. + response = this.GetFinalResponse(); + } + + return response; + } + } + + private QueryResponseCore GetFinalResponse() + { + List finalResult = new List(); + CosmosElement aggregationResult = this.singleGroupAggregator.GetResult(); + if (aggregationResult != null) + { + finalResult.Add(aggregationResult); + } + + QueryResponseCore response = QueryResponseCore.CreateSuccess( + result: finalResult, + requestCharge: 0, + activityId: null, + responseLengthBytes: 0, + disallowContinuationTokenMessage: null, + continuationToken: null, + diagnostics: QueryResponseCore.EmptyDiagnostics); + + return response; + } + + private QueryResponseCore GetEmptyPage(QueryResponseCore sourceResponse) + { + if (!this.TryGetContinuationToken(out string updatedContinuationToken)) + { + throw new InvalidOperationException("Failed to get source continuation token."); + } + + // We need to give empty pages until the results are fully drained. + QueryResponseCore response = QueryResponseCore.CreateSuccess( + result: EmptyResults, + requestCharge: sourceResponse.RequestCharge, + activityId: sourceResponse.ActivityId, + responseLengthBytes: sourceResponse.ResponseLengthBytes, + disallowContinuationTokenMessage: null, + continuationToken: updatedContinuationToken, + diagnostics: sourceResponse.Diagnostics); + + return response; + } + + public override bool TryGetContinuationToken(out string state) + { + if (this.IsDone) + { + state = null; + return true; + } + + if (!this.Source.TryGetContinuationToken(out string sourceState)) + { + state = null; + return false; + } + + AggregateContinuationToken aggregateContinuationToken = AggregateContinuationToken.Create( + this.singleGroupAggregator.GetContinuationToken(), + sourceState); + state = aggregateContinuationToken.ToString(); + return true; + } + + private struct AggregateContinuationToken + { + private const string SingleGroupAggregatorContinuationTokenName = "SingleGroupAggregatorContinuationToken"; + private const string SourceContinuationTokenName = "SourceContinuationToken"; + + private readonly CosmosObject rawCosmosObject; + + private AggregateContinuationToken(CosmosObject rawCosmosObject) + { + if (rawCosmosObject == null) + { + throw new ArgumentNullException(nameof(rawCosmosObject)); + } + + CosmosElement rawSingleGroupAggregatorContinuationToken = rawCosmosObject[AggregateContinuationToken.SingleGroupAggregatorContinuationTokenName]; + if (!(rawSingleGroupAggregatorContinuationToken is CosmosString singleGroupAggregatorContinuationToken)) + { + throw new ArgumentException($"{nameof(rawCosmosObject)} had a property that was not a string."); + } + + CosmosElement rawSourceContinuationToken = rawCosmosObject[AggregateContinuationToken.SourceContinuationTokenName]; + if (!(rawSourceContinuationToken is CosmosString sourceContinuationToken)) + { + throw new ArgumentException($"{nameof(rawCosmosObject)} had a property that was not a string."); + } + + this.rawCosmosObject = rawCosmosObject; + } + + public static AggregateContinuationToken Create( + string singleGroupAggregatorContinuationToken, + string sourceContinuationToken) + { + if (singleGroupAggregatorContinuationToken == null) + { + throw new ArgumentNullException(nameof(singleGroupAggregatorContinuationToken)); + } + + if (sourceContinuationToken == null) + { + throw new ArgumentNullException(nameof(sourceContinuationToken)); + } + + Dictionary dictionary = new Dictionary + { + [AggregateContinuationToken.SingleGroupAggregatorContinuationTokenName] = CosmosString.Create(singleGroupAggregatorContinuationToken), + [AggregateContinuationToken.SourceContinuationTokenName] = CosmosString.Create(sourceContinuationToken) + }; + + CosmosObject rawCosmosObject = CosmosObject.Create(dictionary); + return new AggregateContinuationToken(rawCosmosObject); + } + + public static bool TryParse( + string serializedContinuationToken, + out AggregateContinuationToken aggregateContinuationToken) + { + if (serializedContinuationToken == null) + { + throw new ArgumentNullException(nameof(serializedContinuationToken)); + } + + if (!CosmosElement.TryParse(serializedContinuationToken, out CosmosObject rawAggregateContinuationToken)) + { + aggregateContinuationToken = default; + return false; + } + + CosmosElement rawSingleGroupAggregatorContinuationToken = rawAggregateContinuationToken[AggregateContinuationToken.SingleGroupAggregatorContinuationTokenName]; + if (!(rawSingleGroupAggregatorContinuationToken is CosmosString singleGroupAggregatorContinuationToken)) + { + aggregateContinuationToken = default; + return false; + } + + CosmosElement rawSourceContinuationToken = rawAggregateContinuationToken[AggregateContinuationToken.SourceContinuationTokenName]; + if (!(rawSourceContinuationToken is CosmosString sourceContinuationToken)) + { + aggregateContinuationToken = default; + return false; + } + + aggregateContinuationToken = new AggregateContinuationToken(rawAggregateContinuationToken); + return true; + } + + public override string ToString() + { + return this.rawCosmosObject.ToString(); + } + + public string SingleGroupAggregatorContinuationToken + { + get + { + return (this.rawCosmosObject[AggregateContinuationToken.SingleGroupAggregatorContinuationTokenName] as CosmosString).Value; + } + } + + public string SourceContinuationToken + { + get + { + return (this.rawCosmosObject[AggregateContinuationToken.SourceContinuationTokenName] as CosmosString).Value; + } + } + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.cs index f06d64932f..e588a72219 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/AggregateDocumentQueryExecutionComponent.cs @@ -1,16 +1,15 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.CosmosElements; - using ClientSideRequestStatistics = Documents.ClientSideRequestStatistics; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; /// /// Execution component that is able to aggregate local aggregates from multiple continuations and partitions. @@ -22,7 +21,7 @@ namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent /// The reason why we have multiple continuations is because for a long running query we have to break up the results into multiple continuations. /// Fortunately all the aggregates can be aggregated across continuations and partitions. /// - internal sealed class AggregateDocumentQueryExecutionComponent : DocumentQueryExecutionComponentBase + internal abstract partial class AggregateDocumentQueryExecutionComponent : DocumentQueryExecutionComponentBase { /// /// This class does most of the work, since a query like: @@ -46,7 +45,7 @@ internal sealed class AggregateDocumentQueryExecutionComponent : DocumentQueryEx /// The single group aggregator that we will feed results into. /// Whether or not the query has the 'VALUE' keyword. /// This constructor is private since there is some async initialization that needs to happen in CreateAsync(). - private AggregateDocumentQueryExecutionComponent( + protected AggregateDocumentQueryExecutionComponent( IDocumentQueryExecutionComponent source, SingleGroupAggregator singleGroupAggregator, bool isValueAggregateQuery) @@ -64,91 +63,51 @@ private AggregateDocumentQueryExecutionComponent( /// /// Creates a AggregateDocumentQueryExecutionComponent. /// + /// The environment to execute on. + /// The query client. /// The aggregates. /// The alias to aggregate type. /// Whether or not the query has the 'VALUE' keyword. /// The continuation token to resume from. /// The callback to create the source component that supplies the local aggregates. /// The AggregateDocumentQueryExecutionComponent. - public static async Task CreateAsync( + public static async Task CreateAsync( + ExecutionEnvironment executionEnvironment, + CosmosQueryClient queryClient, AggregateOperator[] aggregates, IReadOnlyDictionary aliasToAggregateType, bool hasSelectValue, string requestContinuation, Func> createSourceCallback) { - return new AggregateDocumentQueryExecutionComponent( - await createSourceCallback(requestContinuation), - SingleGroupAggregator.Create(aggregates, aliasToAggregateType, hasSelectValue), - aggregates != null && aggregates.Count() == 1); - } - - /// - /// Drains at most 'maxElements' documents from the AggregateDocumentQueryExecutionComponent. - /// - /// This value is ignored, since the aggregates are aggregated for you. - /// The cancellation token. - /// The aggregate result after all the continuations have been followed. - /// - /// Note that this functions follows all continuations meaning that it won't return until all continuations are drained. - /// This means that if you have a long running query this function will take a very long time to return. - /// - public override async Task DrainAsync(int maxElements, CancellationToken token) - { - // Note-2016-10-25-felixfan: Given what we support now, we should expect to return only 1 document. - // Note-2019-07-11-brchon: We can return empty pages until all the documents are drained, - // but then we will have to design a continuation token. - - double requestCharge = 0; - long responseLengthBytes = 0; - List diagnosticsPages = new List(); - while (!this.IsDone) - { - QueryResponseCore result = await base.DrainAsync(int.MaxValue, token); - if (!result.IsSuccess) - { - return result; - } - - requestCharge += result.RequestCharge; - responseLengthBytes += result.ResponseLengthBytes; - // DEVNOTE: Add when query metrics is supported - // partitionedQueryMetrics += new PartitionedQueryMetrics(results.QueryMetrics); - if (result.diagnostics != null) - { - diagnosticsPages.AddRange(result.diagnostics); - } - - foreach (CosmosElement element in result.CosmosElements) - { - RewrittenAggregateProjections rewrittenAggregateProjections = new RewrittenAggregateProjections( - this.isValueAggregateQuery, - element); - this.singleGroupAggregator.AddValues(rewrittenAggregateProjections.Payload); - } - } - - List finalResult = new List(); - CosmosElement aggregationResult = this.singleGroupAggregator.GetResult(); - if (aggregationResult != null) + IDocumentQueryExecutionComponent aggregateDocumentQueryExecutionComponent; + switch (executionEnvironment) { - finalResult.Add(aggregationResult); + case ExecutionEnvironment.Client: + aggregateDocumentQueryExecutionComponent = await ClientAggregateDocumentQueryExecutionComponent.CreateAsync( + queryClient, + aggregates, + aliasToAggregateType, + hasSelectValue, + requestContinuation, + createSourceCallback); + break; + + case ExecutionEnvironment.Compute: + aggregateDocumentQueryExecutionComponent = await ComputeAggregateDocumentQueryExecutionComponent.CreateAsync( + queryClient, + aggregates, + aliasToAggregateType, + hasSelectValue, + requestContinuation, + createSourceCallback); + break; + + default: + throw new ArgumentException($"Unknown {nameof(ExecutionEnvironment)}: {executionEnvironment}."); } - return QueryResponseCore.CreateSuccess( - result: finalResult, - continuationToken: null, - activityId: null, - disallowContinuationTokenMessage: null, - requestCharge: requestCharge, - diagnostics: diagnosticsPages, - responseLengthBytes: responseLengthBytes); - } - - public override bool TryGetContinuationToken(out string state) - { - state = null; - return true; + return aggregateDocumentQueryExecutionComponent; } /// @@ -166,8 +125,7 @@ public RewrittenAggregateProjections(bool isValueAggregateQuery, CosmosElement r if (isValueAggregateQuery) { // SELECT VALUE [{"item": {"sum": SUM(c.blah), "count": COUNT(c.blah)}}] - CosmosArray aggregates = raw as CosmosArray; - if (aggregates == null) + if (!(raw is CosmosArray aggregates)) { throw new ArgumentException($"{nameof(RewrittenAggregateProjections)} was not an array for a value aggregate query. Type is: {raw.Type}"); } @@ -176,8 +134,7 @@ public RewrittenAggregateProjections(bool isValueAggregateQuery, CosmosElement r } else { - CosmosObject cosmosObject = raw as CosmosObject; - if (cosmosObject == null) + if (!(raw is CosmosObject cosmosObject)) { throw new ArgumentException($"{nameof(raw)} must not be an object."); } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/CosmosQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/CosmosQueryExecutionComponent.cs index 312af80c78..c1fb016336 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/CosmosQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/CosmosQueryExecutionComponent.cs @@ -1,7 +1,7 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DistinctDocumentQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DistinctDocumentQueryExecutionComponent.cs index 7e1a1cbdfa..a2c78a86ed 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DistinctDocumentQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DistinctDocumentQueryExecutionComponent.cs @@ -1,7 +1,7 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; @@ -143,7 +143,7 @@ public override async Task DrainAsync(int maxElements, Cancel disallowContinuationTokenMessage: disallowContinuationTokenMessage, activityId: cosmosQueryResponse.ActivityId, requestCharge: cosmosQueryResponse.RequestCharge, - diagnostics: cosmosQueryResponse.diagnostics, + diagnostics: cosmosQueryResponse.Diagnostics, responseLengthBytes: cosmosQueryResponse.ResponseLengthBytes); } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DocumentQueryExecutionComponentBase.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DocumentQueryExecutionComponentBase.cs index 44af7b25b4..b4060f364f 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DocumentQueryExecutionComponentBase.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/DocumentQueryExecutionComponentBase.cs @@ -1,13 +1,11 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; - using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos; /// /// Base class for all DocumentQueryExecutionComponents that implements and IDocumentQueryExecutionComponent diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/GroupByDocumentQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/GroupByDocumentQueryExecutionComponent.cs index 7447f11728..57cd736f7d 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/GroupByDocumentQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/GroupByDocumentQueryExecutionComponent.cs @@ -1,7 +1,7 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; @@ -44,6 +44,7 @@ internal sealed class GroupByDocumentQueryExecutionComponent : DocumentQueryExec private static readonly Dictionary EmptyQueryMetrics = new Dictionary(); private static readonly AggregateOperator[] EmptyAggregateOperators = new AggregateOperator[] { }; + private readonly CosmosQueryClient cosmosQueryClient; private readonly IReadOnlyDictionary groupByAliasToAggregateType; private readonly Dictionary groupingTable; private readonly bool hasSelectValue; @@ -52,16 +53,23 @@ internal sealed class GroupByDocumentQueryExecutionComponent : DocumentQueryExec private bool isDone; private GroupByDocumentQueryExecutionComponent( + CosmosQueryClient cosmosQueryClient, IReadOnlyDictionary groupByAliasToAggregateType, bool hasSelectValue, IDocumentQueryExecutionComponent source) : base(source) { + if (cosmosQueryClient == null) + { + throw new ArgumentNullException(nameof(cosmosQueryClient)); + } + if (groupByAliasToAggregateType == null) { throw new ArgumentNullException(nameof(groupByAliasToAggregateType)); } + this.cosmosQueryClient = cosmosQueryClient; this.groupingTable = new Dictionary(); this.groupByAliasToAggregateType = groupByAliasToAggregateType; this.hasSelectValue = hasSelectValue; @@ -70,6 +78,7 @@ private GroupByDocumentQueryExecutionComponent( public override bool IsDone => this.isDone; public static async Task CreateAsync( + CosmosQueryClient cosmosQueryClient, string requestContinuation, Func> createSourceCallback, IReadOnlyDictionary groupByAliasToAggregateType, @@ -77,6 +86,7 @@ public static async Task CreateAsync( { // We do not support continuation tokens for GROUP BY. return new GroupByDocumentQueryExecutionComponent( + cosmosQueryClient, groupByAliasToAggregateType, hasSelectValue, await createSourceCallback(requestContinuation)); @@ -109,9 +119,11 @@ public override async Task DrainAsync( if (!this.groupingTable.TryGetValue(groupByKeysHash, out SingleGroupAggregator singleGroupAggregator)) { singleGroupAggregator = SingleGroupAggregator.Create( + this.cosmosQueryClient, EmptyAggregateOperators, this.groupByAliasToAggregateType, - this.hasSelectValue); + this.hasSelectValue, + continuationToken: null); this.groupingTable[groupByKeysHash] = singleGroupAggregator; } @@ -126,7 +138,7 @@ public override async Task DrainAsync( disallowContinuationTokenMessage: GroupByDocumentQueryExecutionComponent.ContinuationTokenNotSupportedWithGroupBy, activityId: sourceResponse.ActivityId, requestCharge: sourceResponse.RequestCharge, - diagnostics: sourceResponse.diagnostics, + diagnostics: sourceResponse.Diagnostics, responseLengthBytes: sourceResponse.ResponseLengthBytes); this.isDone = false; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/IDocumentQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/IDocumentQueryExecutionComponent.cs index 3ac7d3989f..b660eadc8f 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/IDocumentQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/IDocumentQueryExecutionComponent.cs @@ -1,7 +1,7 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/SkipDocumentQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/SkipDocumentQueryExecutionComponent.cs index 9533b36e04..d011abca53 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/SkipDocumentQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/SkipDocumentQueryExecutionComponent.cs @@ -1,7 +1,7 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; @@ -9,7 +9,6 @@ namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent using System.Linq; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.CosmosElements; using Newtonsoft.Json; @@ -67,9 +66,9 @@ public override async Task DrainAsync(int maxElements, Cancel } // skip the documents but keep all the other headers - List documentsAfterSkip = sourcePage.CosmosElements.Skip(this.skipCount).ToList(); + IReadOnlyList documentsAfterSkip = sourcePage.CosmosElements.Skip(this.skipCount).ToList(); - int numberOfDocumentsSkipped = sourcePage.CosmosElements.Count - documentsAfterSkip.Count; + int numberOfDocumentsSkipped = sourcePage.CosmosElements.Count() - documentsAfterSkip.Count(); this.skipCount -= numberOfDocumentsSkipped; string updatedContinuationToken = null; @@ -87,7 +86,7 @@ public override async Task DrainAsync(int maxElements, Cancel disallowContinuationTokenMessage: sourcePage.DisallowContinuationTokenMessage, activityId: sourcePage.ActivityId, requestCharge: sourcePage.RequestCharge, - diagnostics: sourcePage.diagnostics, + diagnostics: sourcePage.Diagnostics, responseLengthBytes: sourcePage.ResponseLengthBytes); } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/TakeDocumentQueryExecutionComponent.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/TakeDocumentQueryExecutionComponent.cs index b5bd73c81a..4c24f97a60 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/TakeDocumentQueryExecutionComponent.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionComponent/TakeDocumentQueryExecutionComponent.cs @@ -1,17 +1,14 @@ //------------------------------------------------------------ // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------ -namespace Microsoft.Azure.Cosmos.Query.ExecutionComponent +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent { using System; using System.Collections.Generic; using System.Globalization; using System.Linq; - using System.Net; - using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.CosmosElements; using Newtonsoft.Json; @@ -135,7 +132,7 @@ public override async Task DrainAsync(int maxElements, Cancel disallowContinuationTokenMessage: results.DisallowContinuationTokenMessage, activityId: results.ActivityId, requestCharge: results.RequestCharge, - diagnostics: results.diagnostics, + diagnostics: results.Diagnostics, responseLengthBytes: results.ResponseLengthBytes); } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs index 4e7745c7c7..0e08febca4 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.Cosmos.Query using System.Threading; using System.Threading.Tasks; using Collections.Generic; - using ExecutionComponent; + using Core.ExecutionComponent; using Microsoft.Azure.Cosmos.CosmosElements; using ParallelQuery; using PartitionKeyRange = Documents.PartitionKeyRange; diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosQueryExecutionContextFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosQueryExecutionContextFactory.cs index 4ff607dee5..290eed6854 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosQueryExecutionContextFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosQueryExecutionContextFactory.cs @@ -14,6 +14,7 @@ namespace Microsoft.Azure.Cosmos.Query using global::Azure.Cosmos; #endif using Microsoft.Azure.Cosmos; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; using Microsoft.Azure.Cosmos.Query.ParallelQuery; /// @@ -336,6 +337,7 @@ public static async Task CreateSpecializedDocumentQ maxBufferedItemCount: inputParameters.MaxBufferedItemCount); return await PipelinedDocumentQueryExecutionContext.CreateAsync( + inputParameters.ExecutionEnvironment, cosmosQueryContext, initParams, inputParameters.InitialUserContinuationToken, @@ -444,6 +446,7 @@ public struct InputParameters internal PartitionKey? PartitionKey { get; set; } internal IDictionary Properties { get; set; } internal PartitionedQueryExecutionInfo PartitionedQueryExecutionInfo { get; set; } + internal ExecutionEnvironment ExecutionEnvironment { get; set; } } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/ExecutionEnvironment.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/ExecutionEnvironment.cs new file mode 100644 index 0000000000..46ae77d7bd --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/ExecutionEnvironment.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext +{ + /// + /// Environment the query is going to be executed on. + /// + internal enum ExecutionEnvironment + { + /// + /// Query is being executed on a 3rd party client. + /// + Client, + + /// + /// Query is being executed on the compute gateway. + /// + Compute, + } +} diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/PipelinedDocumentQueryExecutionContext.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/PipelinedDocumentQueryExecutionContext.cs index 7bcc81cccb..6f5ef620bc 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/PipelinedDocumentQueryExecutionContext.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/PipelinedDocumentQueryExecutionContext.cs @@ -5,18 +5,16 @@ namespace Microsoft.Azure.Cosmos.Query { using System; - using System.Collections.Generic; using System.Globalization; - using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos.Core.Trace; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; - using Microsoft.Azure.Cosmos.Query.ExecutionComponent; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; using Microsoft.Azure.Documents.Collections; - using PartitionKeyRange = Documents.PartitionKeyRange; /// /// You can imagine the pipeline to be a directed acyclic graph where documents flow from multiple sources (the partitions) to a single sink (the client who calls on ExecuteNextAsync()). @@ -132,12 +130,14 @@ public override bool TryGetContinuationToken(out string state) /// /// Creates a CosmosPipelinedItemQueryExecutionContext. /// + /// The environment to execute on. /// The parameters for constructing the base class. /// The initial parameters /// The request continuation. /// The cancellation token. /// A task to await on, which in turn returns a CosmosPipelinedItemQueryExecutionContext. public static async Task CreateAsync( + ExecutionEnvironment executionEnvironment, CosmosQueryContext queryContext, CosmosCrossPartitionQueryExecutionContext.CrossPartitionInitParams initParams, string requestContinuationToken, @@ -209,6 +209,7 @@ public static async Task CreateAsync( }; return (CosmosQueryExecutionContext)await PipelinedDocumentQueryExecutionContext.CreateHelperAsync( + executionEnvironment, queryContext.QueryClient, initParams.PartitionedQueryExecutionInfo.QueryInfo, initialPageSize, @@ -218,6 +219,7 @@ public static async Task CreateAsync( } private static async Task CreateHelperAsync( + ExecutionEnvironment executionEnvironment, CosmosQueryClient queryClient, QueryInfo queryInfo, int initialPageSize, @@ -241,6 +243,8 @@ private static async Task CreateHelperAs createComponentFunc = async (continuationToken) => { return await AggregateDocumentQueryExecutionComponent.CreateAsync( + executionEnvironment, + queryClient, queryInfo.Aggregates, queryInfo.GroupByAliasToAggregateType, queryInfo.HasSelectValue, @@ -268,6 +272,7 @@ private static async Task CreateHelperAs createComponentFunc = async (continuationToken) => { return await GroupByDocumentQueryExecutionComponent.CreateAsync( + queryClient, continuationToken, createSourceCallback, queryInfo.GroupByAliasToAggregateType, @@ -383,7 +388,7 @@ public override async Task ExecuteNextAsync(CancellationToken disallowContinuationTokenMessage: queryResponse.DisallowContinuationTokenMessage, activityId: queryResponse.ActivityId, requestCharge: queryResponse.RequestCharge, - diagnostics: queryResponse.diagnostics, + diagnostics: queryResponse.Diagnostics, responseLengthBytes: queryResponse.ResponseLengthBytes); } catch (Exception) diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/ItemProducerTree/ItemProducer.cs b/Microsoft.Azure.Cosmos/src/Query/Core/ItemProducerTree/ItemProducer.cs index d16c6ef4d7..c003854ada 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/ItemProducerTree/ItemProducer.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/ItemProducerTree/ItemProducer.cs @@ -323,7 +323,7 @@ public async Task BufferMoreDocumentsAsync(CancellationToken token) this.produceAsyncCompleteCallback( feedResponse.CosmosElements.Count, feedResponse.RequestCharge, - feedResponse.diagnostics, + feedResponse.Diagnostics, feedResponse.ResponseLengthBytes, token); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/QueryResponseCore.cs b/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/QueryResponseCore.cs index 51dbe87330..c3edc05e11 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/QueryResponseCore.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/QueryClient/QueryResponseCore.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.Query { using System; using System.Collections.Generic; + using System.Linq; using System.Net; using Microsoft.Azure.Cosmos.CosmosElements; using IClientSideRequestStatistics = Documents.IClientSideRequestStatistics; @@ -12,7 +13,7 @@ namespace Microsoft.Azure.Cosmos.Query internal struct QueryResponseCore { - private static readonly IReadOnlyList EmptyList = new List(); + private static readonly IReadOnlyList EmptyList = new List().AsReadOnly(); internal static readonly IReadOnlyCollection EmptyDiagnostics = new List(); private QueryResponseCore( @@ -32,7 +33,7 @@ private QueryResponseCore( this.CosmosElements = result; this.StatusCode = statusCode; this.ActivityId = activityId; - this.diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); + this.Diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); this.ResponseLengthBytes = responseLengthBytes; this.RequestCharge = requestCharge; this.DisallowContinuationTokenMessage = disallowContinuationTokenMessage; @@ -57,7 +58,7 @@ private QueryResponseCore( internal string ActivityId { get; } - internal IReadOnlyCollection diagnostics { get; } + internal IReadOnlyCollection Diagnostics { get; } internal long ResponseLengthBytes { get; } diff --git a/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs b/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs index 96b9b0fee0..bb5ddc9327 100644 --- a/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs +++ b/Microsoft.Azure.Cosmos/src/Query/v3Query/QueryIterator.cs @@ -62,7 +62,8 @@ public static QueryIterator Create( MaxItemCount = queryRequestOptions.MaxItemCount, PartitionKey = queryRequestOptions.PartitionKey, Properties = queryRequestOptions.Properties, - PartitionedQueryExecutionInfo = partitionedQueryExecutionInfo + PartitionedQueryExecutionInfo = partitionedQueryExecutionInfo, + ExecutionEnvironment = queryRequestOptions.ExecutionEnvironment.GetValueOrDefault(Core.ExecutionContext.ExecutionEnvironment.Client), }; return new QueryIterator( @@ -82,7 +83,7 @@ public static QueryIterator Create( { QueryResponseCore responseCore = await this.cosmosQueryExecutionContext.ExecuteNextAsync(cancellationToken); CosmosQueryContext cosmosQueryContext = this.cosmosQueryExecutionContext.CosmosQueryContext; - QueryAggregateDiagnostics diagnostics = new QueryAggregateDiagnostics(responseCore.diagnostics); + QueryAggregateDiagnostics diagnostics = new QueryAggregateDiagnostics(responseCore.Diagnostics); QueryResponse queryResponse; if (responseCore.IsSuccess) { diff --git a/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs b/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs index 000d58dfda..8487899b4d 100644 --- a/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs +++ b/Microsoft.Azure.Cosmos/src/RequestOptions/QueryRequestOptions.cs @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos { using System; using System.Globalization; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext; using Microsoft.Azure.Documents; /// @@ -141,6 +142,8 @@ public ConsistencyLevel? ConsistencyLevel internal CosmosSerializationFormatOptions CosmosSerializationFormatOptions { get; set; } + internal ExecutionEnvironment? ExecutionEnvironment { get; set; } + /// /// Fill the CosmosRequestMessage headers with the set properties /// diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CrossPartitionQueryTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CrossPartitionQueryTests.cs index 8be1d1d9ea..2432bf0f4f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CrossPartitionQueryTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CrossPartitionQueryTests.cs @@ -19,7 +19,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Threading.Tasks; using System.Xml; using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Query.ExecutionComponent; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; using Microsoft.Azure.Cosmos.Routing; using Microsoft.Azure.Documents; using Microsoft.Azure.Documents.Routing; @@ -631,9 +631,12 @@ private static async Task> QueryWithContinuationTokens( string state = null; do { + QueryRequestOptions computeRequestOptions = queryRequestOptions.Clone(); + computeRequestOptions.ExecutionEnvironment = Cosmos.Query.Core.ExecutionContext.ExecutionEnvironment.Compute; + FeedIterator itemQuery = container.GetItemQueryIterator( queryText: query, - requestOptions: queryRequestOptions, + requestOptions: computeRequestOptions, continuationToken: state); FeedResponse cosmosQueryResponse = await itemQuery.ReadNextAsync(); @@ -2047,7 +2050,7 @@ public async Task TestNonValueAggregates() await this.CreateIngestQueryDelete( ConnectionModes.Direct, - CollectionTypes.SinglePartition, + CollectionTypes.SinglePartition | CollectionTypes.MultiPartition, documents, this.TestNonValueAggregates); } @@ -2246,7 +2249,7 @@ private async Task TestNonValueAggregates( { foreach (int maxItemCount in new int[] { 1, 5, 10 }) { - List actual = await QueryWithoutContinuationTokens( + List actual = await RunQuery( container: container, query: query, queryRequestOptions: new QueryRequestOptions() diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BinaryEncodingOverTheWireTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BinaryEncodingOverTheWireTests.cs index b55e837844..6f626db788 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BinaryEncodingOverTheWireTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BinaryEncodingOverTheWireTests.cs @@ -1,470 +1,470 @@ -//----------------------------------------------------------------------- -// -// Copyright (c) Microsoft Corporation. All rights reserved. -// -//----------------------------------------------------------------------- -namespace Microsoft.Azure.Cosmos.Tests -{ - using System; - using System.Collections.Generic; - using System.Configuration; - using System.Linq; - using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.Json; - using Microsoft.VisualStudio.TestTools.UnitTesting; - using Newtonsoft.Json; - using Newtonsoft.Json.Linq; - - /// - /// Tests for CrossPartitionQueryTests. - /// - [TestClass] - [TestCategory("Quarantine") /* Used to filter out quarantined tests in gated runs */] - public class BinaryEncodingOverTheWireTests - { - private static readonly string[] NoDocuments = new string[] { }; - - private static readonly CosmosClient GatewayClient = new CosmosClient( - ConfigurationManager.AppSettings["GatewayEndpoint"], - ConfigurationManager.AppSettings["MasterKey"], - new CosmosClientOptions() { ConnectionMode = ConnectionMode.Gateway }); - private static readonly CosmosClient DirectHttpsClient = new CosmosClient( - ConfigurationManager.AppSettings["GatewayEndpoint"], - ConfigurationManager.AppSettings["MasterKey"], - new CosmosClientOptions() { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Documents.Client.Protocol.Https }); - private static readonly CosmosClient RntbdClient = new CosmosClient( - ConfigurationManager.AppSettings["GatewayEndpoint"], - ConfigurationManager.AppSettings["MasterKey"], - new CosmosClientOptions() { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Documents.Client.Protocol.Tcp }); - private static readonly CosmosClient[] Clients = new CosmosClient[] { GatewayClient, DirectHttpsClient, RntbdClient }; - private static readonly CosmosClient Client = RntbdClient; - private static readonly AsyncLazy Database = new AsyncLazy(async () => - { - return await Client.CreateDatabaseAsync(Guid.NewGuid().ToString()); - }); - - private static async Task CreateContainerAsync() - { - return (await Database.Value).CreateContainerAsync( - Guid.NewGuid().ToString() + "collection", - "/id", - 10000).Result; - } - - private static async Task>> CreateCollectionAndIngestDocuments(IEnumerable documents) - { - Container container = await BinaryEncodingOverTheWireTests.CreateContainerAsync(); - List insertedDocuments = new List(); - Random rand = new Random(1234); - foreach (string serializedItem in documents.OrderBy(x => rand.Next()).Take(100)) - { - JToken item = JToken.Parse(serializedItem); - item["id"] = Guid.NewGuid().ToString(); - JToken createdItem = await container.CreateItemAsync(item, new PartitionKey(item["id"].ToString())); - insertedDocuments.Add(createdItem); - } - - return new Tuple>(container, insertedDocuments); - } - - internal delegate Task Query(CosmosClient cosmosClient, Container container, List items); - - /// - /// Task that wraps boiler plate code for query tests (collection create -> ingest documents -> query documents -> delete collections). - /// Note that this function will take the cross product connectionModes and collectionTypes. - /// - /// The connection modes to use. - /// The type of collections to create. - /// The documents to ingest - /// - /// The callback for the queries. - /// All the standard arguments will be passed in. - /// Please make sure that this function is idempotent, since a collection will be reused for each connection mode. - /// - /// The partition key for the partition collection. - /// The optional args that you want passed in to the query. - /// A task to await on. - private static async Task CreateIngestQueryDelete( - IEnumerable documents, - Query query) - { - Tuple> collectionAndDocuments = await BinaryEncodingOverTheWireTests.CreateCollectionAndIngestDocuments(documents); - - List queryTasks = new List(); - foreach (CosmosClient cosmosClient in BinaryEncodingOverTheWireTests.Clients) - { - queryTasks.Add(query(cosmosClient, collectionAndDocuments.Item1, collectionAndDocuments.Item2)); - } - - await Task.WhenAll(queryTasks); - - await collectionAndDocuments.Item1.DeleteContainerAsync(); - } - - private static async Task NoOp() - { - await Task.Delay(0); - } - - [TestMethod] - public void CheckThatAllTestsAreRunning() - { - // In general I don't want any of these tests being ignored or quarentined. - // Please work with me if it needs to be. - // I do not want these tests turned off for being "flaky", since they have been - // very stable and if they fail it's because something lower level is probably going wrong. - - Assert.AreEqual(0, typeof(BinaryEncodingOverTheWireTests) - .GetMethods() - .Where(method => method.GetCustomAttributes(typeof(TestMethodAttribute), true).Length != 0) - .Where(method => method.GetCustomAttributes(typeof(TestCategoryAttribute), true).Length != 0) - .Count(), $"One the {nameof(BinaryEncodingOverTheWireTests)} is not being run."); - } - - [TestMethod] - public async Task CombinedScriptsDataTest() - { - await this.TestCurratedDocs("CombinedScriptsData.json"); - } - - // For now we are skipping this test since the documents are too large to ingest and we get a rate size too large (HTTP 413). -#if TEST_COUNTRY - [TestMethod] - public async Task CountriesTest() - { - await this.TestCurratedDocs("countries"); - } -#endif - - [TestMethod] - public async Task DevTestCollTest() - { - await this.TestCurratedDocs("devtestcoll.json"); - } - - [TestMethod] - public async Task LastFMTest() - { - await this.TestCurratedDocs("lastfm"); - } - - [TestMethod] - public async Task LogDataTest() - { - await this.TestCurratedDocs("LogData.json"); - } - - [TestMethod] - public async Task MillionSong1KDocumentsTest() - { - await this.TestCurratedDocs("MillionSong1KDocuments.json"); - } - - [TestMethod] - public async Task MsnCollectionTest() - { - await this.TestCurratedDocs("MsnCollection.json"); - } - - [TestMethod] - public async Task NutritionDataTest() - { - await this.TestCurratedDocs("NutritionData"); - } - - [TestMethod] - public async Task RunsCollectionTest() - { - await this.TestCurratedDocs("runsCollection"); - } - - [TestMethod] - public async Task StatesCommitteesTest() - { - await this.TestCurratedDocs("states_committees.json"); - } - - [TestMethod] - public async Task StatesLegislatorsTest() - { - await this.TestCurratedDocs("states_legislators"); - } - - [TestMethod] - public async Task Store01Test() - { - await this.TestCurratedDocs("store01C.json"); - } - - [TestMethod] - public async Task TicinoErrorBucketsTest() - { - await this.TestCurratedDocs("TicinoErrorBuckets"); - } - - [TestMethod] - public async Task TwitterDataTest() - { - await this.TestCurratedDocs("twitter_data"); - } - - [TestMethod] - public async Task Ups1Test() - { - await this.TestCurratedDocs("ups1"); - } - - [TestMethod] - public async Task XpertEventsTest() - { - await this.TestCurratedDocs("XpertEvents"); - } - - private async Task TestCurratedDocs(string path) - { - IEnumerable documents = BinaryEncodingOverTheWireTests.GetDocumentsFromCurratedDoc(path); - await BinaryEncodingOverTheWireTests.CreateIngestQueryDelete( - documents.Select(x => x.ToString()), - this.TestCurratedDocs); - } - - private async Task TestCurratedDocs(CosmosClient cosmosClient, Container container, List items) - { - HashSet inputItems = new HashSet(items, JsonTokenEqualityComparer.Value); - - async Task AssertQueryDrainsCorrectlyAsync(FeedIterator feedIterator) - { - while (feedIterator.HasMoreResults) - { - FeedResponse feedResponse = await feedIterator.ReadNextAsync(); - foreach (JToken item in feedResponse) - { - Assert.IsTrue(inputItems.Contains(item), "Documents differ from input documents"); - } - } - } - - FeedIterator textFeedIterator = container.GetItemQueryIterator( - queryDefinition: new QueryDefinition("SELECT * FROM c ORDER BY c._ts"), - requestOptions: new QueryRequestOptions() - { - CosmosSerializationFormatOptions = new CosmosSerializationFormatOptions( - "JsonText", - (content) => JsonNavigator.Create(content), - () => Cosmos.Json.JsonWriter.Create(JsonSerializationFormat.Text)), - }); - - await AssertQueryDrainsCorrectlyAsync(textFeedIterator); - - FeedIterator binaryFeedIterator = container.GetItemQueryIterator( - queryDefinition: new QueryDefinition("SELECT * FROM c ORDER BY c._ts"), - requestOptions: new QueryRequestOptions() - { - CosmosSerializationFormatOptions = new CosmosSerializationFormatOptions( - "CosmosBinary", - (content) => JsonNavigator.Create(content), - () => Cosmos.Json.JsonWriter.Create(JsonSerializationFormat.Text)), - }); - - await AssertQueryDrainsCorrectlyAsync(binaryFeedIterator); - } - - private static IEnumerable GetDocumentsFromCurratedDoc(string path) - { - path = string.Format("TestJsons/{0}", path); - string json = TextFileConcatenation.ReadMultipartFile(path); - List documents; - try - { - documents = JsonConvert.DeserializeObject>(json); - } - catch (JsonSerializationException) - { - documents = new List - { - JsonConvert.DeserializeObject(json) - }; - } - - return documents; - } - - public sealed class AsyncLazy : Lazy> - { - public AsyncLazy(Func valueFactory) : - base(() => Task.Factory.StartNew(valueFactory)) - { } - - public AsyncLazy(Func> taskFactory) : - base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap()) - { } - } - - public sealed class JsonTokenEqualityComparer : IEqualityComparer - { - public static JsonTokenEqualityComparer Value = new JsonTokenEqualityComparer(); - - public bool Equals(double double1, double double2) - { - return double1 == double2; - } - - public bool Equals(string string1, string string2) - { - return string1.Equals(string2); - } - - public bool Equals(bool bool1, bool bool2) - { - return bool1 == bool2; - } - - public bool Equals(JArray jArray1, JArray jArray2) - { - if (jArray1.Count != jArray2.Count) - { - return false; - } - - IEnumerable> pairwiseElements = jArray1 - .Zip(jArray2, (first, second) => new Tuple(first, second)); - bool deepEquals = true; - foreach (Tuple pairwiseElement in pairwiseElements) - { - deepEquals &= this.Equals(pairwiseElement.Item1, pairwiseElement.Item2); - } - - return deepEquals; - } - - public bool Equals(JObject jObject1, JObject jObject2) - { - if (jObject1.Count != jObject2.Count) - { - return false; - } - - bool deepEquals = true; - foreach (KeyValuePair kvp in jObject1) - { - string name = kvp.Key; - JToken value1 = kvp.Value; - - JToken value2; - if (jObject2.TryGetValue(name, out value2)) - { - deepEquals &= this.Equals(value1, value2); - } - else - { - return false; - } - } - - return deepEquals; - } - - public bool Equals(JToken jToken1, JToken jToken2) - { - if (object.ReferenceEquals(jToken1, jToken2)) - { - return true; - } - - if (jToken1 == null || jToken2 == null) - { - return false; - } - - JsonType type1 = JTokenTypeToJsonType(jToken1.Type); - JsonType type2 = JTokenTypeToJsonType(jToken2.Type); - - // If the types don't match - if (type1 != type2) - { - return false; - } - - switch (type1) - { - - case JsonType.Object: - return this.Equals((JObject)jToken1, (JObject)jToken2); - case JsonType.Array: - return this.Equals((JArray)jToken1, (JArray)jToken2); - case JsonType.Number: - // NOTE: Some double values in the test document cannot be represented exactly as double. These values get some - // additional decimals at the end. So instead of comparing for equality, we need to find the diff and check - // if it is within the acceptable limit. One example of such an value is 00324008. - return Math.Abs((double)jToken1 - (double)jToken2) <= 1E-9; - case JsonType.String: - // TODO: Newtonsoft reader treats string representing datetime as type Date and doing a ToString returns - // a string that is not in the original format. In case of our binary reader we treat datetime as string - // and return the original string, so this comparison doesn't work for datetime. For now, we are skipping - // date type comparison. Will enable it after fixing this discrepancy - if (jToken1.Type == JTokenType.Date || jToken2.Type == JTokenType.Date) - { - return true; - } - - return this.Equals(jToken1.ToString(), jToken2.ToString()); - case JsonType.Boolean: - return this.Equals((bool)jToken1, (bool)jToken2); - case JsonType.Null: - return true; - default: - throw new ArgumentException(); - } - } - - public int GetHashCode(JToken obj) - { - return 0; - } - - private enum JsonType - { - Number, - String, - Null, - Array, - Object, - Boolean - } - - private static JsonType JTokenTypeToJsonType(JTokenType type) - { - switch (type) - { - - case JTokenType.Object: - return JsonType.Object; - case JTokenType.Array: - return JsonType.Array; - case JTokenType.Integer: - case JTokenType.Float: - return JsonType.Number; - case JTokenType.Guid: - case JTokenType.Uri: - case JTokenType.TimeSpan: - case JTokenType.Date: - case JTokenType.String: - return JsonType.String; - case JTokenType.Boolean: - return JsonType.Boolean; - case JTokenType.Null: - return JsonType.Null; - case JTokenType.None: - case JTokenType.Undefined: - case JTokenType.Constructor: - case JTokenType.Property: - case JTokenType.Comment: - case JTokenType.Raw: - case JTokenType.Bytes: - default: - throw new ArgumentException(); - } - } - } - } +//----------------------------------------------------------------------- +// +// Copyright (c) Microsoft Corporation. All rights reserved. +// +//----------------------------------------------------------------------- +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.Collections.Generic; + using System.Configuration; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Json; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; + using Newtonsoft.Json.Linq; + + /// + /// Tests for CrossPartitionQueryTests. + /// + [TestClass] + [TestCategory("Quarantine") /* Used to filter out quarantined tests in gated runs */] + public class BinaryEncodingOverTheWireTests + { + private static readonly string[] NoDocuments = new string[] { }; + + private static readonly CosmosClient GatewayClient = new CosmosClient( + ConfigurationManager.AppSettings["GatewayEndpoint"], + ConfigurationManager.AppSettings["MasterKey"], + new CosmosClientOptions() { ConnectionMode = ConnectionMode.Gateway }); + private static readonly CosmosClient DirectHttpsClient = new CosmosClient( + ConfigurationManager.AppSettings["GatewayEndpoint"], + ConfigurationManager.AppSettings["MasterKey"], + new CosmosClientOptions() { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Documents.Client.Protocol.Https }); + private static readonly CosmosClient RntbdClient = new CosmosClient( + ConfigurationManager.AppSettings["GatewayEndpoint"], + ConfigurationManager.AppSettings["MasterKey"], + new CosmosClientOptions() { ConnectionMode = ConnectionMode.Direct, ConnectionProtocol = Documents.Client.Protocol.Tcp }); + private static readonly CosmosClient[] Clients = new CosmosClient[] { GatewayClient, DirectHttpsClient, RntbdClient }; + private static readonly CosmosClient Client = RntbdClient; + private static readonly AsyncLazy Database = new AsyncLazy(async () => + { + return await Client.CreateDatabaseAsync(Guid.NewGuid().ToString()); + }); + + private static async Task CreateContainerAsync() + { + return (await Database.Value).CreateContainerAsync( + Guid.NewGuid().ToString() + "collection", + "/id", + 10000).Result; + } + + private static async Task>> CreateCollectionAndIngestDocuments(IEnumerable documents) + { + Container container = await BinaryEncodingOverTheWireTests.CreateContainerAsync(); + List insertedDocuments = new List(); + Random rand = new Random(1234); + foreach (string serializedItem in documents.OrderBy(x => rand.Next()).Take(100)) + { + JToken item = JToken.Parse(serializedItem); + item["id"] = Guid.NewGuid().ToString(); + JToken createdItem = await container.CreateItemAsync(item, new PartitionKey(item["id"].ToString())); + insertedDocuments.Add(createdItem); + } + + return new Tuple>(container, insertedDocuments); + } + + internal delegate Task Query(CosmosClient cosmosClient, Container container, List items); + + /// + /// Task that wraps boiler plate code for query tests (collection create -> ingest documents -> query documents -> delete collections). + /// Note that this function will take the cross product connectionModes and collectionTypes. + /// + /// The connection modes to use. + /// The type of collections to create. + /// The documents to ingest + /// + /// The callback for the queries. + /// All the standard arguments will be passed in. + /// Please make sure that this function is idempotent, since a collection will be reused for each connection mode. + /// + /// The partition key for the partition collection. + /// The optional args that you want passed in to the query. + /// A task to await on. + private static async Task CreateIngestQueryDelete( + IEnumerable documents, + Query query) + { + Tuple> collectionAndDocuments = await BinaryEncodingOverTheWireTests.CreateCollectionAndIngestDocuments(documents); + + List queryTasks = new List(); + foreach (CosmosClient cosmosClient in BinaryEncodingOverTheWireTests.Clients) + { + queryTasks.Add(query(cosmosClient, collectionAndDocuments.Item1, collectionAndDocuments.Item2)); + } + + await Task.WhenAll(queryTasks); + + await collectionAndDocuments.Item1.DeleteContainerAsync(); + } + + private static async Task NoOp() + { + await Task.Delay(0); + } + + [TestMethod] + public void CheckThatAllTestsAreRunning() + { + // In general I don't want any of these tests being ignored or quarentined. + // Please work with me if it needs to be. + // I do not want these tests turned off for being "flaky", since they have been + // very stable and if they fail it's because something lower level is probably going wrong. + + Assert.AreEqual(0, typeof(BinaryEncodingOverTheWireTests) + .GetMethods() + .Where(method => method.GetCustomAttributes(typeof(TestMethodAttribute), true).Length != 0) + .Where(method => method.GetCustomAttributes(typeof(TestCategoryAttribute), true).Length != 0) + .Count(), $"One the {nameof(BinaryEncodingOverTheWireTests)} is not being run."); + } + + [TestMethod] + public async Task CombinedScriptsDataTest() + { + await this.TestCurratedDocs("CombinedScriptsData.json"); + } + + // For now we are skipping this test since the documents are too large to ingest and we get a rate size too large (HTTP 413). +#if TEST_COUNTRY + [TestMethod] + public async Task CountriesTest() + { + await this.TestCurratedDocs("countries"); + } +#endif + + [TestMethod] + public async Task DevTestCollTest() + { + await this.TestCurratedDocs("devtestcoll.json"); + } + + [TestMethod] + public async Task LastFMTest() + { + await this.TestCurratedDocs("lastfm"); + } + + [TestMethod] + public async Task LogDataTest() + { + await this.TestCurratedDocs("LogData.json"); + } + + [TestMethod] + public async Task MillionSong1KDocumentsTest() + { + await this.TestCurratedDocs("MillionSong1KDocuments.json"); + } + + [TestMethod] + public async Task MsnCollectionTest() + { + await this.TestCurratedDocs("MsnCollection.json"); + } + + [TestMethod] + public async Task NutritionDataTest() + { + await this.TestCurratedDocs("NutritionData"); + } + + [TestMethod] + public async Task RunsCollectionTest() + { + await this.TestCurratedDocs("runsCollection"); + } + + [TestMethod] + public async Task StatesCommitteesTest() + { + await this.TestCurratedDocs("states_committees.json"); + } + + [TestMethod] + public async Task StatesLegislatorsTest() + { + await this.TestCurratedDocs("states_legislators"); + } + + [TestMethod] + public async Task Store01Test() + { + await this.TestCurratedDocs("store01C.json"); + } + + [TestMethod] + public async Task TicinoErrorBucketsTest() + { + await this.TestCurratedDocs("TicinoErrorBuckets"); + } + + [TestMethod] + public async Task TwitterDataTest() + { + await this.TestCurratedDocs("twitter_data"); + } + + [TestMethod] + public async Task Ups1Test() + { + await this.TestCurratedDocs("ups1"); + } + + [TestMethod] + public async Task XpertEventsTest() + { + await this.TestCurratedDocs("XpertEvents"); + } + + private async Task TestCurratedDocs(string path) + { + IEnumerable documents = BinaryEncodingOverTheWireTests.GetDocumentsFromCurratedDoc(path); + await BinaryEncodingOverTheWireTests.CreateIngestQueryDelete( + documents.Select(x => x.ToString()), + this.TestCurratedDocs); + } + + private async Task TestCurratedDocs(CosmosClient cosmosClient, Container container, List items) + { + HashSet inputItems = new HashSet(items, JsonTokenEqualityComparer.Value); + + async Task AssertQueryDrainsCorrectlyAsync(FeedIterator feedIterator) + { + while (feedIterator.HasMoreResults) + { + FeedResponse feedResponse = await feedIterator.ReadNextAsync(); + foreach (JToken item in feedResponse) + { + Assert.IsTrue(inputItems.Contains(item), "Documents differ from input documents"); + } + } + } + + FeedIterator textFeedIterator = container.GetItemQueryIterator( + queryDefinition: new QueryDefinition("SELECT * FROM c ORDER BY c._ts"), + requestOptions: new QueryRequestOptions() + { + CosmosSerializationFormatOptions = new CosmosSerializationFormatOptions( + "JsonText", + (content) => JsonNavigator.Create(content), + () => Cosmos.Json.JsonWriter.Create(JsonSerializationFormat.Text)), + }); + + await AssertQueryDrainsCorrectlyAsync(textFeedIterator); + + FeedIterator binaryFeedIterator = container.GetItemQueryIterator( + queryDefinition: new QueryDefinition("SELECT * FROM c ORDER BY c._ts"), + requestOptions: new QueryRequestOptions() + { + CosmosSerializationFormatOptions = new CosmosSerializationFormatOptions( + "CosmosBinary", + (content) => JsonNavigator.Create(content), + () => Cosmos.Json.JsonWriter.Create(JsonSerializationFormat.Text)), + }); + + await AssertQueryDrainsCorrectlyAsync(binaryFeedIterator); + } + + private static IEnumerable GetDocumentsFromCurratedDoc(string path) + { + path = string.Format("TestJsons/{0}", path); + string json = TextFileConcatenation.ReadMultipartFile(path); + List documents; + try + { + documents = JsonConvert.DeserializeObject>(json); + } + catch (JsonSerializationException) + { + documents = new List + { + JsonConvert.DeserializeObject(json) + }; + } + + return documents; + } + + public sealed class AsyncLazy : Lazy> + { + public AsyncLazy(Func valueFactory) : + base(() => Task.Factory.StartNew(valueFactory)) + { } + + public AsyncLazy(Func> taskFactory) : + base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap()) + { } + } + + public sealed class JsonTokenEqualityComparer : IEqualityComparer + { + public static JsonTokenEqualityComparer Value = new JsonTokenEqualityComparer(); + + public bool Equals(double double1, double double2) + { + return double1 == double2; + } + + public bool Equals(string string1, string string2) + { + return string1.Equals(string2); + } + + public bool Equals(bool bool1, bool bool2) + { + return bool1 == bool2; + } + + public bool Equals(JArray jArray1, JArray jArray2) + { + if (jArray1.Count != jArray2.Count) + { + return false; + } + + IEnumerable> pairwiseElements = jArray1 + .Zip(jArray2, (first, second) => new Tuple(first, second)); + bool deepEquals = true; + foreach (Tuple pairwiseElement in pairwiseElements) + { + deepEquals &= this.Equals(pairwiseElement.Item1, pairwiseElement.Item2); + } + + return deepEquals; + } + + public bool Equals(JObject jObject1, JObject jObject2) + { + if (jObject1.Count != jObject2.Count) + { + return false; + } + + bool deepEquals = true; + foreach (KeyValuePair kvp in jObject1) + { + string name = kvp.Key; + JToken value1 = kvp.Value; + + JToken value2; + if (jObject2.TryGetValue(name, out value2)) + { + deepEquals &= this.Equals(value1, value2); + } + else + { + return false; + } + } + + return deepEquals; + } + + public bool Equals(JToken jToken1, JToken jToken2) + { + if (object.ReferenceEquals(jToken1, jToken2)) + { + return true; + } + + if (jToken1 == null || jToken2 == null) + { + return false; + } + + JsonType type1 = JTokenTypeToJsonType(jToken1.Type); + JsonType type2 = JTokenTypeToJsonType(jToken2.Type); + + // If the types don't match + if (type1 != type2) + { + return false; + } + + switch (type1) + { + + case JsonType.Object: + return this.Equals((JObject)jToken1, (JObject)jToken2); + case JsonType.Array: + return this.Equals((JArray)jToken1, (JArray)jToken2); + case JsonType.Number: + // NOTE: Some double values in the test document cannot be represented exactly as double. These values get some + // additional decimals at the end. So instead of comparing for equality, we need to find the diff and check + // if it is within the acceptable limit. One example of such an value is 00324008. + return Math.Abs((double)jToken1 - (double)jToken2) <= 1E-9; + case JsonType.String: + // TODO: Newtonsoft reader treats string representing datetime as type Date and doing a ToString returns + // a string that is not in the original format. In case of our binary reader we treat datetime as string + // and return the original string, so this comparison doesn't work for datetime. For now, we are skipping + // date type comparison. Will enable it after fixing this discrepancy + if (jToken1.Type == JTokenType.Date || jToken2.Type == JTokenType.Date) + { + return true; + } + + return this.Equals(jToken1.ToString(), jToken2.ToString()); + case JsonType.Boolean: + return this.Equals((bool)jToken1, (bool)jToken2); + case JsonType.Null: + return true; + default: + throw new ArgumentException(); + } + } + + public int GetHashCode(JToken obj) + { + return 0; + } + + private enum JsonType + { + Number, + String, + Null, + Array, + Object, + Boolean + } + + private static JsonType JTokenTypeToJsonType(JTokenType type) + { + switch (type) + { + + case JTokenType.Object: + return JsonType.Object; + case JTokenType.Array: + return JsonType.Array; + case JTokenType.Integer: + case JTokenType.Float: + return JsonType.Number; + case JTokenType.Guid: + case JTokenType.Uri: + case JTokenType.TimeSpan: + case JTokenType.Date: + case JTokenType.String: + return JsonType.String; + case JTokenType.Boolean: + return JsonType.Boolean; + case JTokenType.Null: + return JsonType.Null; + case JTokenType.None: + case JTokenType.Undefined: + case JTokenType.Constructor: + case JTokenType.Property: + case JTokenType.Comment: + case JTokenType.Raw: + case JTokenType.Bytes: + default: + throw new ArgumentException(); + } + } + } + } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosQueryUnitTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosQueryUnitTests.cs index 2e560e98b0..b35b0b9873 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosQueryUnitTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/CosmosQueryUnitTests.cs @@ -11,7 +11,7 @@ namespace Microsoft.Azure.Cosmos.Tests using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Query; - using Microsoft.Azure.Cosmos.Query.ExecutionComponent; + using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -58,7 +58,7 @@ public void VerifyCosmosQueryResponseStream() [TestMethod] public async Task TestCosmosQueryExecutionComponentOnFailure() { - (IList components, QueryResponseCore response) setupContext = await this.GetAllExecutionComponents(); + (IList components, QueryResponseCore response) setupContext = await this.GetAllExecutionComponents(); foreach (DocumentQueryExecutionComponentBase component in setupContext.components) { @@ -70,7 +70,7 @@ public async Task TestCosmosQueryExecutionComponentOnFailure() [TestMethod] public async Task TestCosmosQueryExecutionComponentCancellation() { - (IList components, QueryResponseCore response) setupContext = await this.GetAllExecutionComponents(); + (IList components, QueryResponseCore response) setupContext = await this.GetAllExecutionComponents(); CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.Cancel(); @@ -148,11 +148,11 @@ public async Task TestCosmosQueryPartitionKeyDefinition() await factory.ExecuteNextAsync(cancellationtoken); } - private async Task<(IList components, QueryResponseCore response)> GetAllExecutionComponents() + private async Task<(IList components, QueryResponseCore response)> GetAllExecutionComponents() { (Func> func, QueryResponseCore response) setupContext = this.SetupBaseContextToVerifyFailureScenario(); - List components = new List(); + List components = new List(); List operators = new List() { AggregateOperator.Average, @@ -163,6 +163,8 @@ public async Task TestCosmosQueryPartitionKeyDefinition() }; components.Add(await AggregateDocumentQueryExecutionComponent.CreateAsync( + Query.Core.ExecutionContext.ExecutionEnvironment.Client, + new Mock().Object, operators.ToArray(), new Dictionary() {