Skip to content

Commit

Permalink
Aggregate Continuation Tokens (#843)
Browse files Browse the repository at this point in the history
* added continuation token stop and start logic for aggregators

* got continuation token ready

* added preemption to aggregate query

* fixed bug where query finishes in a single continuation

* fixed continuation token double escaping bug

* fixed bug where where min / max / undefined value was getting serialized into a continuation token

* wired up try get state

* resolved iteration comments

* renamed

* fixed tests for aggregates

* dummy commit

* bumped the timeout

* reduced timeout

* dummy commit

* dummy commit

* fixed infinite loop

* aggregates technically can get continuation token (it's a single page always for now)

* forked aggregates into compute and sdk implementations

* fixed build issues

* forked create paths

* fixed cancellation token test

* resolved iteration comments

* resolved iteration comments

* resolved iteration comments

* changed continuation token logic

* more inlined with early exit

* more inlined with early exits
  • Loading branch information
bchong95 authored and ealsur committed Nov 19, 2019
1 parent 018447f commit 1322022
Show file tree
Hide file tree
Showing 30 changed files with 1,314 additions and 651 deletions.
44 changes: 25 additions & 19 deletions Microsoft.Azure.Cosmos/src/CosmosElements/CosmosElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ public static CosmosElement CreateFromBuffer(ReadOnlyMemory<byte> buffer)
return CosmosElement.Dispatch(jsonNavigator, jsonNavigatorNode);
}

public static CosmosElement Parse(string json)
{
byte[] buffer = Encoding.UTF8.GetBytes(json);
return CosmosElement.CreateFromBuffer(buffer);
}

public static CosmosElement Dispatch(
IJsonNavigator jsonNavigator,
IJsonNavigatorNode jsonNavigatorNode)
Expand Down Expand Up @@ -131,43 +125,55 @@ public static CosmosElement Dispatch(
return item;
}

public static bool TryParse(string serializedCosmosElement, out CosmosElement cosmosElement)
public static CosmosElement Parse(string json)
{
if (!CosmosElement.TryParse(json, out CosmosElement cosmosElement))
{
throw new ArgumentException($"Failed to parse json: {json}.");
}

return cosmosElement;
}

public static bool TryParse(
string serializedCosmosElement,
out CosmosElement cosmosElement)
{
cosmosElement = default(CosmosElement);
if (serializedCosmosElement == null)
{
throw new ArgumentNullException(nameof(serializedCosmosElement));
}

byte[] buffer = Encoding.UTF8.GetBytes(serializedCosmosElement);
try
{
byte[] buffer = Encoding.UTF8.GetBytes(serializedCosmosElement);
cosmosElement = CosmosElement.CreateFromBuffer(buffer);
return true;
}
catch (JsonParseException)
{
cosmosElement = default(CosmosElement);
return false;
cosmosElement = default;
}

return cosmosElement != default;
}

public static bool TryParse<TCosmosElement>(string serializedCosmosElement, out TCosmosElement cosmosElement)
where TCosmosElement : CosmosElement
public static bool TryParse<TCosmosElement>(
string serializedCosmosElement,
out TCosmosElement typedCosmosElement)
{
if (!CosmosElement.TryParse(serializedCosmosElement, out CosmosElement rawCosmosElement))
if (!CosmosElement.TryParse(serializedCosmosElement, out CosmosElement cosmosElement))
{
cosmosElement = default(TCosmosElement);
typedCosmosElement = default;
return false;
}

if (!(rawCosmosElement is TCosmosElement typedCosmosElement))
if (!(cosmosElement is TCosmosElement tempCosmosElement))
{
cosmosElement = default(TCosmosElement);
typedCosmosElement = default;
return false;
}

cosmosElement = typedCosmosElement;
typedCosmosElement = tempCosmosElement;
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.CosmosElements
using System;
using System.Collections.Generic;
using Microsoft.Azure.Cosmos.Json;
using Newtonsoft.Json;

#if INTERNAL
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
Expand Down Expand Up @@ -48,11 +47,6 @@ public override bool TryGetValue(string key, out CosmosElement value)
return this.dictionary.TryGetValue(key, out value);
}

public override string ToString()
{
return JsonConvert.SerializeObject(this.dictionary);
}

public override void WriteTo(IJsonWriter jsonWriter)
{
if (jsonWriter == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
namespace Microsoft.Azure.Cosmos.Query
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Newtonsoft.Json;

internal sealed class QueryAggregateDiagnostics : CosmosDiagnostics
{
Expand All @@ -26,7 +23,7 @@ public QueryAggregateDiagnostics(
this.Pages = pages;
}

private IReadOnlyCollection<QueryPageDiagnostics> Pages { get; }
public IReadOnlyCollection<QueryPageDiagnostics> Pages { get; }

public override string ToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ internal sealed class AverageAggregator : IAggregator
/// <summary>
/// The running weighted average for this aggregator.
/// </summary>
private AverageInfo globalAverage = new AverageInfo(0, 0);
private AverageInfo globalAverage;

private AverageAggregator(AverageInfo globalAverage)
{
this.globalAverage = globalAverage;
}

/// <summary>
/// Averages the supplied item with the previously supplied items.
Expand All @@ -40,6 +45,29 @@ public CosmosElement GetResult()
return this.globalAverage.GetAverage();
}

public string GetContinuationToken()
{
return this.globalAverage.ToString();
}

public static AverageAggregator Create(string continuationToken)
{
AverageInfo averageInfo;
if (continuationToken != null)
{
if (!AverageInfo.TryParse(continuationToken, out averageInfo))
{
throw new ArgumentException($"Invalid continuation token: {continuationToken}");
}
}
else
{
averageInfo = new AverageInfo(0, 0);
}

return new AverageAggregator(averageInfo);
}

/// <summary>
/// Struct that stores a weighted average as a sum and count so they that average across different partitions with different numbers of documents can be taken.
/// </summary>
Expand Down Expand Up @@ -166,6 +194,31 @@ public CosmosNumber GetAverage()

return CosmosNumber64.Create(this.Sum.Value / this.Count);
}

public override string ToString()
{
return $@"{{
{(this.Sum.HasValue ? $@"""{SumName}"" : {this.Sum.Value}," : string.Empty)}
""{CountName}"" : {this.Count}
}}";
}

public static bool TryParse(string serializedAverageInfo, out AverageInfo averageInfo)
{
if (serializedAverageInfo == null)
{
throw new ArgumentNullException(nameof(serializedAverageInfo));
}

if (!CosmosElement.TryParse(serializedAverageInfo, out CosmosElement cosmosElementAverageInfo))
{
averageInfo = default(AverageInfo);
return false;
}

averageInfo = AverageInfo.Create(cosmosElementAverageInfo);
return true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
namespace Microsoft.Azure.Cosmos.Query.Aggregation
{
using System;
using System.Globalization;
using System.Net;
using Microsoft.Azure.Cosmos.CosmosElements;

/// <summary>
Expand All @@ -18,6 +20,16 @@ internal sealed class CountAggregator : IAggregator
/// </summary>
private long globalCount;

private CountAggregator(long initialCount)
{
if (initialCount < 0)
{
throw new ArgumentOutOfRangeException(nameof(initialCount));
}

this.globalCount = initialCount;
}

/// <summary>
/// Adds a count to the running count.
/// </summary>
Expand Down Expand Up @@ -47,5 +59,30 @@ public CosmosElement GetResult()
{
return CosmosNumber64.Create(this.globalCount);
}

public string GetContinuationToken()
{
return this.globalCount.ToString(CultureInfo.InvariantCulture);
}

public static CountAggregator Create(string continuationToken)
{
long partialCount;
if (continuationToken != null)
{
if (!long.TryParse(continuationToken, out partialCount))
{
throw new CosmosException(
HttpStatusCode.BadRequest,
$@"Invalid count continuation token: ""{continuationToken}"".");
}
}
else
{
partialCount = 0;
}

return new CountAggregator(initialCount: partialCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ internal interface IAggregator
/// </summary>
/// <returns>The result of the aggregation.</returns>
CosmosElement GetResult();

/// <summary>
/// Gets a continuation token that stores the partial aggregate up till this point.
/// </summary>
/// <returns>A continuation token that stores the partial aggregate up till this point.</returns>
string GetContinuationToken();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ namespace Microsoft.Azure.Cosmos.Query.Aggregation
/// </summary>
internal sealed class MinMaxAggregator : IAggregator
{
private const string MinValueContinuationToken = "MIN_VALUE";
private const string MaxValueContinuationToken = "MAX_VALUE";
private const string UndefinedContinuationToken = "UNDEFINED";

private static readonly CosmosElement Undefined = null;
/// <summary>
/// Whether or not the aggregation is a min or a max.
Expand All @@ -24,17 +28,10 @@ internal sealed class MinMaxAggregator : IAggregator
/// </summary>
private CosmosElement globalMinMax;

public MinMaxAggregator(bool isMinAggregation)
private MinMaxAggregator(bool isMinAggregation, CosmosElement globalMinMax)
{
this.isMinAggregation = isMinAggregation;
if (this.isMinAggregation)
{
globalMinMax = ItemComparer.MaxValue;
}
else
{
globalMinMax = ItemComparer.MinValue;
}
this.globalMinMax = globalMinMax;
}

public void Aggregate(CosmosElement localMinMax)
Expand Down Expand Up @@ -98,7 +95,7 @@ public void Aggregate(CosmosElement localMinMax)
if (!ItemComparer.IsMinOrMax(this.globalMinMax)
&& (!CosmosElementIsPrimitive(localMinMax) || !CosmosElementIsPrimitive(this.globalMinMax)))
{
// This means we are comparing non primitives with is undefined
// This means we are comparing non primitives which is undefined
this.globalMinMax = Undefined;
return;
}
Expand Down Expand Up @@ -136,6 +133,72 @@ public CosmosElement GetResult()
return result;
}

public string GetContinuationToken()
{
string continuationToken;
if (this.globalMinMax == ItemComparer.MinValue)
{
continuationToken = MinMaxAggregator.MinValueContinuationToken;
}
else if (this.globalMinMax == ItemComparer.MaxValue)
{
continuationToken = MinMaxAggregator.MaxValueContinuationToken;
}
else if (this.globalMinMax == Undefined)
{
continuationToken = MinMaxAggregator.UndefinedContinuationToken;
}
else
{
continuationToken = this.globalMinMax.ToString();
}

return continuationToken;
}

public static MinMaxAggregator CreateMinAggregator(string continuationToken)
{
return MinMaxAggregator.Create(isMinAggregation: true, continuationToken: continuationToken);
}

public static MinMaxAggregator CreateMaxAggregator(string continuationToken)
{
return MinMaxAggregator.Create(isMinAggregation: false, continuationToken: continuationToken);
}

private static MinMaxAggregator Create(bool isMinAggregation, string continuationToken)
{
CosmosElement globalMinMax;
if (continuationToken != null)
{
if (continuationToken == MinMaxAggregator.MaxValueContinuationToken)
{
globalMinMax = ItemComparer.MaxValue;
}
else if (continuationToken == MinMaxAggregator.MinValueContinuationToken)
{
globalMinMax = ItemComparer.MinValue;
}
else if (continuationToken == MinMaxAggregator.UndefinedContinuationToken)
{
globalMinMax = MinMaxAggregator.Undefined;
}
else
{
if (!CosmosElement.TryParse(continuationToken, out globalMinMax))
{
throw new ArgumentException($"Malformed continuation token: {continuationToken}");
}
}
}
else
{
globalMinMax = isMinAggregation ? (CosmosElement)ItemComparer.MaxValue : (CosmosElement)ItemComparer.MinValue;
}

return new MinMaxAggregator(isMinAggregation: isMinAggregation, globalMinMax: globalMinMax);
}

private static bool CosmosElementIsPrimitive(CosmosElement cosmosElement)
{
if (cosmosElement == null)
Expand Down
Loading

0 comments on commit 1322022

Please sign in to comment.