Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Fixes non streaming order by to use flag from query plan #4459

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,17 @@ private sealed class InitializationParameters

public int MaxConcurrency { get; }

public bool NonStreamingOrderBy { get; }
neildsh marked this conversation as resolved.
Show resolved Hide resolved

public InitializationParameters(
IDocumentContainer documentContainer,
SqlQuerySpec sqlQuerySpec,
IReadOnlyList<FeedRangeEpk> targetRanges,
PartitionKey? partitionKey,
IReadOnlyList<OrderByColumn> orderByColumns,
QueryPaginationOptions queryPaginationOptions,
int maxConcurrency)
int maxConcurrency,
bool nonStreamingOrderBy)
{
this.DocumentContainer = documentContainer ?? throw new ArgumentNullException(nameof(documentContainer));
this.SqlQuerySpec = sqlQuerySpec ?? throw new ArgumentNullException(nameof(sqlQuerySpec));
Expand All @@ -71,6 +74,7 @@ public InitializationParameters(
this.OrderByColumns = orderByColumns ?? throw new ArgumentNullException(nameof(orderByColumns));
this.QueryPaginationOptions = queryPaginationOptions ?? throw new ArgumentNullException(nameof(queryPaginationOptions));
this.MaxConcurrency = maxConcurrency;
this.NonStreamingOrderBy = nonStreamingOrderBy;
}
}

Expand Down Expand Up @@ -181,6 +185,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
IReadOnlyList<OrderByColumn> orderByColumns,
QueryPaginationOptions queryPaginationOptions,
int maxConcurrency,
bool nonStreamingOrderBy,
CosmosElement continuationToken)
{
if (documentContainer == null)
Expand Down Expand Up @@ -233,24 +238,28 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
partitionKey,
orderByColumns,
queryPaginationOptions,
maxConcurrency);
maxConcurrency,
nonStreamingOrderBy);

return TryCatch<IQueryPipelineStage>.FromResult(new OrderByCrossPartitionQueryPipelineStage(init));
}

private static async ValueTask<(TryCatch<IQueryPipelineStage>, Queue<QueryPage>)> MoveNextAsync_InitializeAsync(InitializationParameters init, ITrace trace, CancellationToken cancellationToken)
private static async ValueTask<(TryCatch<IQueryPipelineStage>, Queue<QueryPage>)> MoveNextAsync_InitializeAsync(
InitializationParameters parameters,
ITrace trace,
CancellationToken cancellationToken)
{
SqlQuerySpec rewrittenQueryForOrderBy = new SqlQuerySpec(
init.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
init.SqlQuerySpec.Parameters);
parameters.SqlQuerySpec.QueryText.Replace(oldValue: FormatPlaceHolder, newValue: TrueFilter),
parameters.SqlQuerySpec.Parameters);

List<OrderByQueryPartitionRangePageAsyncEnumerator> uninitializedEnumerators = init.TargetRanges
List<OrderByQueryPartitionRangePageAsyncEnumerator> uninitializedEnumerators = parameters.TargetRanges
.Select(range => OrderByQueryPartitionRangePageAsyncEnumerator.Create(
init.DocumentContainer,
parameters.DocumentContainer,
rewrittenQueryForOrderBy,
new FeedRangeState<QueryState>(range, state: default),
init.PartitionKey,
init.QueryPaginationOptions,
parameters.PartitionKey,
parameters.QueryPaginationOptions,
TrueFilter,
PrefetchPolicy.PrefetchSinglePage))
.ToList();
Expand All @@ -259,13 +268,12 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
uninitializedEnumerators
.Select(x => (x, (OrderByContinuationToken)null)));

await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, init.MaxConcurrency, trace, cancellationToken);
await ParallelPrefetch.PrefetchInParallelAsync(uninitializedEnumerators, parameters.MaxConcurrency, trace, cancellationToken);

IReadOnlyList<SortOrder> sortOrders = init.OrderByColumns.Select(column => column.SortOrder).ToList();
IReadOnlyList<SortOrder> sortOrders = parameters.OrderByColumns.Select(column => column.SortOrder).ToList();
PriorityQueue<OrderByQueryPartitionRangePageAsyncEnumerator> initializedEnumerators = new PriorityQueue<OrderByQueryPartitionRangePageAsyncEnumerator>(new OrderByEnumeratorComparer(sortOrders));
Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)> enumeratorsAndTokens = new Queue<(OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken token)>();

bool nonStreaming = false;
Queue<QueryPage> bufferedPages = new Queue<QueryPage>();
QueryPageParameters queryPageParameters = null;
while (uninitializedEnumeratorsAndTokens.Count != 0)
Expand All @@ -278,7 +286,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
if (IsSplitException(enumerator.Current.Exception))
{
await MoveNextAsync_InitializeAsync_HandleSplitAsync(
init.DocumentContainer,
parameters.DocumentContainer,
uninitializedEnumeratorsAndTokens,
enumerator,
token,
Expand Down Expand Up @@ -307,9 +315,6 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
additionalHeaders: page.AdditionalHeaders);
}

// For backwards compatibility the default value of streaming for ORDER BY is _true_
nonStreaming = nonStreaming || (!page.Streaming.GetValueOrDefault(true) && (page.State != null));

if (enumerator.Current.Result.Enumerator.MoveNext())
{
// the page is non-empty then we need to enqueue the enumerator in the PriorityQueue
Expand All @@ -335,7 +340,7 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
}

IQueryPipelineStage pipelineStage;
if (nonStreaming)
if (parameters.NonStreamingOrderBy)
{
Queue<OrderByQueryPartitionRangePageAsyncEnumerator> orderbyEnumerators = new Queue<OrderByQueryPartitionRangePageAsyncEnumerator>();
foreach ((OrderByQueryPartitionRangePageAsyncEnumerator enumerator, OrderByContinuationToken _) in enumeratorsAndTokens)
Expand All @@ -350,10 +355,10 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
orderbyEnumerators.Enqueue(bufferedEnumerator);
}

await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, init.MaxConcurrency, trace, cancellationToken);
await ParallelPrefetch.PrefetchInParallelAsync(orderbyEnumerators, parameters.MaxConcurrency, trace, cancellationToken);

pipelineStage = await NonStreamingOrderByPipelineStage.CreateAsync(
init.QueryPaginationOptions,
parameters.QueryPaginationOptions,
sortOrders,
orderbyEnumerators,
queryPageParameters,
Expand All @@ -363,12 +368,12 @@ await MoveNextAsync_InitializeAsync_HandleSplitAsync(
else
{
pipelineStage = StreamingOrderByCrossPartitionQueryPipelineStage.Create(
init.DocumentContainer,
parameters.DocumentContainer,
sortOrders,
initializedEnumerators,
enumeratorsAndTokens,
init.QueryPaginationOptions,
init.MaxConcurrency);
parameters.QueryPaginationOptions,
parameters.MaxConcurrency);
}

return (TryCatch<IQueryPipelineStage>.FromResult(pipelineStage), bufferedPages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
.OrderByExpressions
.Zip(queryInfo.OrderBy, (expression, sortOrder) => new OrderByColumn(expression, sortOrder)).ToList(),
queryPaginationOptions: queryPaginationOptions,
maxConcurrency: maxConcurrency,
maxConcurrency: maxConcurrency,
nonStreamingOrderBy: queryInfo.HasNonStreamingOrderBy,
continuationToken: continuationToken);
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ FROM c
[Benchmark(Baseline = true)]
public Task StreamingOrderByPipelineStage()
{
return CreateAndRunPipeline(StreamingContainer);
return CreateAndRunPipeline(StreamingContainer, nonStreamingOrderBy: false);
}

[Benchmark]
public Task NonStreamingOrderByPipelineStage()
{
return CreateAndRunPipeline(NonStreamingContainer);
return CreateAndRunPipeline(NonStreamingContainer, nonStreamingOrderBy: true);
}

private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer)
private static async Task CreateAndRunPipeline(IDocumentContainer documentContainer, bool nonStreamingOrderBy)
{
IReadOnlyList<FeedRangeEpk> ranges = await documentContainer.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
Expand All @@ -67,6 +67,7 @@ private static async Task CreateAndRunPipeline(IDocumentContainer documentContai
orderByColumns: OrderByColumns,
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: EndUserPageSize),
maxConcurrency: MaxConcurrency,
nonStreamingOrderBy: nonStreamingOrderBy,
continuationToken: null);

IQueryPipelineStage pipeline = pipelineStage.Result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,16 @@ private static async Task RunParityTests(
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: true);

IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
documentContainer: documentContainer,
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: false);

if (!streamingResult.SequenceEqual(nonStreamingResult))
{
Expand All @@ -255,7 +257,8 @@ private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStag
IReadOnlyList<FeedRangeEpk> ranges,
string queryText,
IReadOnlyList<OrderByColumn> orderByColumns,
int pageSize)
int pageSize,
bool nonStreamingOrderBy)
{
TryCatch<IQueryPipelineStage> pipelineStage = OrderByCrossPartitionQueryPipelineStage.MonadicCreate(
documentContainer: documentContainer,
Expand All @@ -265,6 +268,7 @@ private static async Task<IReadOnlyList<CosmosElement>> CreateAndRunPipelineStag
orderByColumns: orderByColumns,
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: pageSize),
maxConcurrency: MaxConcurrency,
nonStreamingOrderBy: nonStreamingOrderBy,
continuationToken: null);

Assert.IsTrue(pipelineStage.Succeeded);
Expand Down Expand Up @@ -311,15 +315,17 @@ private static async Task RunParityTests(IReadOnlyList<ParityTestCase> testCases
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: true);

DebugTraceHelpers.TraceStreamingPipelineStarting();
IReadOnlyList<CosmosElement> streamingResult = await CreateAndRunPipelineStage(
documentContainer: streamingDocumentContainer,
ranges: ranges,
queryText: testCase.QueryText,
orderByColumns: testCase.OrderByColumns,
pageSize: pageSize);
pageSize: pageSize,
nonStreamingOrderBy: false);

if (!streamingResult.SequenceEqual(nonStreamingResult))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void MonadicCreate_NullContinuationToken()
new OrderByColumn("_ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
}
Expand All @@ -98,6 +99,7 @@ public void MonadicCreate_NonCosmosArrayContinuationToken()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosObject.Create(new Dictionary<string, CosmosElement>()));
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
Expand All @@ -119,6 +121,7 @@ public void MonadicCreate_EmptyArrayContinuationToken()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(new List<CosmosElement>()));
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
Expand All @@ -140,6 +143,7 @@ public void MonadicCreate_NonParallelContinuationToken()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(new List<CosmosElement>() { CosmosString.Create("asdf") }));
Assert.IsTrue(monadicCreate.Failed);
Assert.IsTrue(monadicCreate.InnerMostException is MalformedContinuationTokenException);
Expand Down Expand Up @@ -176,6 +180,7 @@ public void MonadicCreate_SingleOrderByContinuationToken()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
Expand Down Expand Up @@ -220,6 +225,7 @@ public void MonadicCreate_SingleOrderByContinuationToken()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
Expand Down Expand Up @@ -279,6 +285,7 @@ public void MonadicCreate_MultipleOrderByContinuationToken()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
Expand Down Expand Up @@ -321,6 +328,7 @@ public void MonadicCreate_OrderByWithResumeValues()
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
Expand Down Expand Up @@ -361,7 +369,8 @@ public void MonadicCreate_OrderByWithResumeValues()
new OrderByColumn("item2", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: CosmosArray.Create(
new List<CosmosElement>()
{
Expand Down Expand Up @@ -416,7 +425,8 @@ public async Task TestFormattedFiltersForTargetPartitionWithContinuationTokenAsy
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 1),
maxConcurrency: 0,
maxConcurrency: 0,
nonStreamingOrderBy: false,
continuationToken: CosmosElement.Parse(continuationToken));
Assert.IsTrue(monadicCreate.Succeeded);

Expand Down Expand Up @@ -451,7 +461,8 @@ FROM c
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
Expand Down Expand Up @@ -500,7 +511,8 @@ FROM c
new OrderByColumn("c._ts", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: null);
Assert.IsTrue(monadicCreate.Succeeded);
IQueryPipelineStage queryPipelineStage = monadicCreate.Result;
Expand Down Expand Up @@ -557,7 +569,8 @@ FROM c
new OrderByColumn("c.pk", SortOrder.Ascending)
},
queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10),
maxConcurrency: 10,
maxConcurrency: 10,
nonStreamingOrderBy: false,
continuationToken: continuationToken);
monadicQueryPipelineStage.ThrowIfFailed();
IQueryPipelineStage queryPipelineStage = monadicQueryPipelineStage.Result;
Expand Down
Loading