Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query: Adds support for non streaming ORDER BY #4362

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2d90f0f
Initial prep for adding a non streaming order by query pipline stage
neildsh Mar 9, 2024
92a0625
Change TracingAsyncEnumerator to be an adapter class between ITracing…
neildsh Mar 11, 2024
0eff840
Move TracingAsyncEnumerator to product code
neildsh Mar 11, 2024
3ae0b62
Change ITracingAsyncEnumerator.MoveNext to take a CancellationToken, …
neildsh Mar 11, 2024
0ff0687
draft implementation of non streaming order by pipeline stage
neildsh Mar 14, 2024
b252123
Fix bug that drops first record on an initialized page
neildsh Mar 14, 2024
163f31f
Add a test class for non streaming order by unit tests
neildsh Mar 14, 2024
83e1726
remove unnecessary usings
neildsh Mar 14, 2024
bf2e7dd
Add an emulator test for non streaming order by. Also lay groundwork …
neildsh Mar 18, 2024
275b063
Remove the ResponseLengthInBytes property from QueryPage. This was be…
neildsh Mar 18, 2024
94e3125
Add an ItemCount property to the Page class
neildsh Mar 18, 2024
9eb7809
Add a multi level heap implementation for non streaming order by
neildsh Mar 20, 2024
ce69f9d
Add infrastructure for writing parity tests
neildsh Mar 20, 2024
702c7ad
Fix a bug that caused the page enumerator to be dropped when we reach…
neildsh Mar 20, 2024
cb874da
When cloning OrderByQueryPartitionRangePageAsyncEnumerator as a fully…
neildsh Mar 20, 2024
97d93d9
Simplify the non streaming pipeline stage, and add performance test
neildsh Mar 22, 2024
075cb46
Revert the changes for Headers.ItemCount Keep it as a string
neildsh Mar 22, 2024
1d02ee8
Avoid an allocation each time the OrderByItems property of OrderByQue…
neildsh Mar 22, 2024
5f464d0
Fix up the OrderByPipelineSatgeBenchmark to use fully materialized Co…
neildsh Mar 22, 2024
8089fdf
Add a few more unit tests for non streaming order by
neildsh Mar 22, 2024
6e5a807
Add emulator tests for non streaming order by
neildsh Mar 25, 2024
82bde9b
Add a few more integration test cases
neildsh Mar 26, 2024
be3fb81
Fix up broken unit test
neildsh Mar 26, 2024
c54e89b
Add more test coverage for the non streaming order by
neildsh Mar 27, 2024
8babdb7
If there is no continuationtoken, assume that the response is streaming
neildsh Mar 27, 2024
784295a
Add stronger validation to the non streaming order by unit tests
neildsh Mar 27, 2024
98218bb
Fix up broken unit tests to account for ItemCount
neildsh Mar 27, 2024
6b66ad2
fix up plumbing for index utilization, and incorporate code review fe…
neildsh Mar 27, 2024
6abe501
Minor clean up
neildsh Mar 27, 2024
cf6f04b
revert bug introduced in pursuit of more elegant code :)
neildsh Mar 28, 2024
3065ed1
fix up broken unit test
neildsh Mar 28, 2024
b966a8a
Fix up broken perf test
neildsh Mar 28, 2024
3661c36
Fix up broken IndexMetricsParserBaselineTest
neildsh Mar 29, 2024
74f3eff
Minor bug fixes for OrderByCrossPartitionEnumerator
neildsh Mar 29, 2024
05befa6
Merge branch 'master' into users/ndeshpan/nonStreamingOrderBy
neildsh Mar 29, 2024
2942dd1
Merge branch 'master' into users/ndeshpan/nonStreamingOrderBy
neildsh Mar 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Serializer;

using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerable : IAsyncEnumerable<TryCatch<ChangeFeedPage>>
{
private readonly IDocumentContainer documentContainer;
Expand All @@ -37,12 +38,13 @@ public IAsyncEnumerator<TryCatch<ChangeFeedPage>> GetAsyncEnumerator(Cancellatio
CrossPartitionChangeFeedAsyncEnumerator innerEnumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
this.documentContainer,
innerState,
this.changeFeedPaginationOptions,
cancellationToken);
this.changeFeedPaginationOptions);

return new ChangeFeedCrossFeedRangeAsyncEnumerator(
ChangeFeedCrossFeedRangeAsyncEnumerator changeFeedEnumerator = new ChangeFeedCrossFeedRangeAsyncEnumerator(
innerEnumerator,
this.jsonSerializationFormatOptions);
this.jsonSerializationFormatOptions);

return new TracingAsyncEnumerator<TryCatch<ChangeFeedPage>>(changeFeedEnumerator, NoOpTrace.Singleton, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Serializer;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerator : IAsyncEnumerator<TryCatch<ChangeFeedPage>>
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerator : ITracingAsyncEnumerator<TryCatch<ChangeFeedPage>>
{
private readonly CrossPartitionChangeFeedAsyncEnumerator enumerator;
private readonly JsonSerializationFormatOptions jsonSerializationFormatOptions;
Expand All @@ -29,9 +30,9 @@ public ChangeFeedCrossFeedRangeAsyncEnumerator(

public ValueTask DisposeAsync() => this.enumerator.DisposeAsync();

public async ValueTask<bool> MoveNextAsync()
public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
if (!await this.enumerator.MoveNextAsync())
if (!await this.enumerator.MoveNextAsync(trace, cancellationToken))
{
throw new InvalidOperationException("Change Feed should always be able to move next.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public ChangeFeedIteratorCore(
changeFeedRequestOptions?.PageSizeHint,
changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
additionalHeaders,
this.changeFeedQuerySpec),
cancellationToken: default);
this.changeFeedQuerySpec));
TryCatch<CrossPartitionChangeFeedAsyncEnumerator> monadicEnumerator = TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromResult(enumerator);
return monadicEnumerator;
Expand Down Expand Up @@ -274,11 +273,10 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
}

CrossPartitionChangeFeedAsyncEnumerator enumerator = monadicEnumerator.Result;
enumerator.SetCancellationToken(cancellationToken);

try
{
if (!await enumerator.MoveNextAsync(trace))
if (!await enumerator.MoveNextAsync(trace, cancellationToken))
{
throw new InvalidOperationException("ChangeFeed enumerator should always have a next continuation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ internal sealed class ChangeFeedNotModifiedPage : ChangeFeedPage
private static readonly ImmutableHashSet<string> bannedHeaders = new HashSet<string>().ToImmutableHashSet();

public ChangeFeedNotModifiedPage(
double requestCharge,
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
}

}

public override int ItemCount => 0;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ internal sealed class ChangeFeedPartitionRangePageAsyncEnumerator : PartitionRan
public ChangeFeedPartitionRangePageAsyncEnumerator(
IChangeFeedDataSource changeFeedDataSource,
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken)
: base(feedRangeState, cancellationToken)
ChangeFeedPaginationOptions changeFeedPaginationOptions)
: base(feedRangeState)
{
this.changeFeedDataSource = changeFeedDataSource ?? throw new ArgumentNullException(nameof(changeFeedDataSource));
this.changeFeedPaginationOptions = changeFeedPaginationOptions ?? throw new ArgumentNullException(nameof(changeFeedPaginationOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ internal sealed class ChangeFeedSuccessPage : ChangeFeedPage

public ChangeFeedSuccessPage(
Stream content,
double requestCharge,
double requestCharge,
int itemCount,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.ItemCount = itemCount;
}

public Stream Content { get; }

public Stream Content { get; }

public override int ItemCount { get; }

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,24 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class CrossPartitionChangeFeedAsyncEnumerator : IAsyncEnumerator<TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>>
internal sealed class CrossPartitionChangeFeedAsyncEnumerator : ITracingAsyncEnumerator<TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>>
{
private readonly CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator;
private CancellationToken cancellationToken;
private TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>? bufferedException;

private CrossPartitionChangeFeedAsyncEnumerator(
CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator,
CancellationToken cancellationToken)
CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator)
{
this.crossPartitionEnumerator = crossPartitionEnumerator ?? throw new ArgumentNullException(nameof(crossPartitionEnumerator));
this.cancellationToken = cancellationToken;
}

public TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>> Current { get; private set; }

public ValueTask DisposeAsync() => this.crossPartitionEnumerator.DisposeAsync();

public ValueTask<bool> MoveNextAsync()
{
return this.MoveNextAsync(NoOpTrace.Singleton);
}

public async ValueTask<bool> MoveNextAsync(ITrace trace)
public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
this.cancellationToken.ThrowIfCancellationRequested();
cancellationToken.ThrowIfCancellationRequested();

if (trace == null)
{
Expand All @@ -53,7 +45,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
return true;
}

if (!await this.crossPartitionEnumerator.MoveNextAsync(changeFeedMoveNextTrace))
if (!await this.crossPartitionEnumerator.MoveNextAsync(changeFeedMoveNextTrace, cancellationToken))
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
}
Expand All @@ -80,7 +72,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
double totalRequestCharge = backendPage.RequestCharge;
do
{
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages))
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages, cancellationToken))
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
}
Expand All @@ -107,15 +99,16 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
{
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
totalRequestCharge,
changeFeedSuccessPage.ItemCount,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
Expand All @@ -133,17 +126,10 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
}
}

public void SetCancellationToken(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.crossPartitionEnumerator.SetCancellationToken(cancellationToken);
}

public static CrossPartitionChangeFeedAsyncEnumerator Create(
IDocumentContainer documentContainer,
CrossFeedRangeState<ChangeFeedState> state,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken)
ChangeFeedPaginationOptions changeFeedPaginationOptions)
{
changeFeedPaginationOptions ??= ChangeFeedPaginationOptions.Default;

Expand All @@ -156,17 +142,14 @@ public static CrossPartitionChangeFeedAsyncEnumerator Create(
documentContainer,
CrossPartitionChangeFeedAsyncEnumerator.MakeCreateFunction(
documentContainer,
changeFeedPaginationOptions,
cancellationToken),
changeFeedPaginationOptions),
comparer: default /* this uses a regular queue instead of priority queue */,
maxConcurrency: default,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
cancellationToken: cancellationToken,
state: state);

CrossPartitionChangeFeedAsyncEnumerator enumerator = new CrossPartitionChangeFeedAsyncEnumerator(
crossPartitionEnumerator,
cancellationToken);
crossPartitionEnumerator);

return enumerator;
}
Expand All @@ -181,11 +164,9 @@ private static bool IsNextRangeEqualToOriginal(

private static CreatePartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> MakeCreateFunction(
IChangeFeedDataSource changeFeedDataSource,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
ChangeFeedPaginationOptions changeFeedPaginationOptions) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
changeFeedDataSource,
feedRangeState,
changeFeedPaginationOptions,
cancellationToken);
changeFeedPaginationOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@ internal sealed class BufferedPartitionRangePageAsyncEnumerator<TPage, TState> :
where TState : State
{
private readonly PartitionRangePageAsyncEnumerator<TPage, TState> enumerator;
private TryCatch<TPage>? bufferedPage;
private TryCatch<TPage>? bufferedPage;

public override Exception BufferedException
{
get
{
if (this.bufferedPage.HasValue && this.bufferedPage.Value.Failed)
{
return this.bufferedPage.Value.Exception;
}

return null;
}
}

public override int BufferedItemCount => this.bufferedPage.HasValue && this.bufferedPage.Value.Succeeded ?
this.bufferedPage.Value.Result.ItemCount :
0;

public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator, CancellationToken cancellationToken)
: base(enumerator.FeedRangeState, cancellationToken)
public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator)
: base(enumerator.FeedRangeState)
{
this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
}
Expand Down Expand Up @@ -59,15 +76,9 @@ public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken ca

using (ITrace prefetchTrace = trace.StartChild("Prefetch", TraceComponent.Pagination, TraceLevel.Info))
{
await this.enumerator.MoveNextAsync(prefetchTrace);
await this.enumerator.MoveNextAsync(prefetchTrace, cancellationToken);
this.bufferedPage = this.enumerator.Current;
}
}

public override void SetCancellationToken(CancellationToken cancellationToken)
{
base.SetCancellationToken(cancellationToken);
this.enumerator.SetCancellationToken(cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Pagination
{
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;
Expand All @@ -12,10 +13,14 @@ internal abstract class BufferedPartitionRangePageAsyncEnumeratorBase<TPage, TSt
where TPage : Page<TState>
where TState : State
{
protected BufferedPartitionRangePageAsyncEnumeratorBase(FeedRangeState<TState> feedRangeState, CancellationToken cancellationToken)
: base(feedRangeState, cancellationToken)
protected BufferedPartitionRangePageAsyncEnumeratorBase(FeedRangeState<TState> feedRangeState)
: base(feedRangeState)
{
}
}

public abstract Exception BufferedException { get; }

public abstract int BufferedItemCount { get; }

public abstract ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken);
}
Expand Down
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/Pagination/CrossFeedRangePage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ public CrossFeedRangePage(TBackendPage backendEndPage, CrossFeedRangeState<TBack
this.Page = backendEndPage;
}

public TBackendPage Page { get; }

public TBackendPage Page { get; }

public override int ItemCount => this.Page.ItemCount;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Loading
Loading