Skip to content

Commit

Permalink
Query: Adds support for non streaming ORDER BY (#4362)
Browse files Browse the repository at this point in the history
* Initial prep for adding a non streaming order by query pipline stage

* Change TracingAsyncEnumerator to be an adapter class between ITracingAsyncEnumerator and IAsyncEnumerator

* Move TracingAsyncEnumerator to product code

* Change ITracingAsyncEnumerator.MoveNext to take a CancellationToken, thereby removing the awkward method SetCancellationToken from IQueryPipelineStage

* draft implementation of non streaming order by pipeline stage

* Fix bug that drops first record on an initialized page

* Add a test class for non streaming order by unit tests

* remove unnecessary usings

* Add an emulator test for non streaming order by. Also lay groundwork for an alternative implementation for non streaming order by

* Remove the ResponseLengthInBytes property from QueryPage. This was being calculated incorrectly, and seems to be unused to boot.

* Add an ItemCount property to the Page class

* Add a multi level heap implementation for non streaming order by

* Add infrastructure for writing parity tests

* Fix a bug that caused the page enumerator to be dropped when we reach flat heap sizze limit

* When cloning OrderByQueryPartitionRangePageAsyncEnumerator as a fully buffered enumerator, make sure that we set the page size to maximum for the backend requests

* Simplify the non streaming pipeline stage, and add performance test

* Revert the changes for Headers.ItemCount Keep it as a string

* Avoid an allocation each time the OrderByItems property of OrderByQueryResult is touched

* Fix up the OrderByPipelineSatgeBenchmark to use fully materialized CosmosElements in the oages returned from MockContainer

* Add a few more unit tests for non streaming order by

* Add emulator tests for non streaming order by

* Add a few more integration test cases

* Fix up broken unit test

* Add more test coverage for the non streaming order by

* If there is no continuationtoken, assume that the response is streaming

* Add stronger validation to the non streaming order by unit tests

* Fix up broken unit tests to account for ItemCount

* fix up plumbing for index utilization, and incorporate code review feedback

* Minor clean up

* revert bug introduced in pursuit of more elegant code :)

* fix up broken unit test

* Fix up broken perf test

* Fix up broken IndexMetricsParserBaselineTest

* Minor bug fixes for OrderByCrossPartitionEnumerator
  • Loading branch information
neildsh committed Apr 1, 2024
1 parent 4855a6a commit 5788326
Show file tree
Hide file tree
Showing 109 changed files with 8,445 additions and 6,371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Serializer;

using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerable : IAsyncEnumerable<TryCatch<ChangeFeedPage>>
{
private readonly IDocumentContainer documentContainer;
Expand All @@ -37,12 +38,13 @@ public IAsyncEnumerator<TryCatch<ChangeFeedPage>> GetAsyncEnumerator(Cancellatio
CrossPartitionChangeFeedAsyncEnumerator innerEnumerator = CrossPartitionChangeFeedAsyncEnumerator.Create(
this.documentContainer,
innerState,
this.changeFeedPaginationOptions,
cancellationToken);
this.changeFeedPaginationOptions);

return new ChangeFeedCrossFeedRangeAsyncEnumerator(
ChangeFeedCrossFeedRangeAsyncEnumerator changeFeedEnumerator = new ChangeFeedCrossFeedRangeAsyncEnumerator(
innerEnumerator,
this.jsonSerializationFormatOptions);
this.jsonSerializationFormatOptions);

return new TracingAsyncEnumerator<TryCatch<ChangeFeedPage>>(changeFeedEnumerator, NoOpTrace.Singleton, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Serializer;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerator : IAsyncEnumerator<TryCatch<ChangeFeedPage>>
using Microsoft.Azure.Cosmos.Serializer;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class ChangeFeedCrossFeedRangeAsyncEnumerator : ITracingAsyncEnumerator<TryCatch<ChangeFeedPage>>
{
private readonly CrossPartitionChangeFeedAsyncEnumerator enumerator;
private readonly JsonSerializationFormatOptions jsonSerializationFormatOptions;
Expand All @@ -29,9 +30,9 @@ public ChangeFeedCrossFeedRangeAsyncEnumerator(

public ValueTask DisposeAsync() => this.enumerator.DisposeAsync();

public async ValueTask<bool> MoveNextAsync()
public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
if (!await this.enumerator.MoveNextAsync())
if (!await this.enumerator.MoveNextAsync(trace, cancellationToken))
{
throw new InvalidOperationException("Change Feed should always be able to move next.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public ChangeFeedIteratorCore(
changeFeedRequestOptions?.PageSizeHint,
changeFeedRequestOptions?.JsonSerializationFormatOptions?.JsonSerializationFormat,
additionalHeaders,
this.changeFeedQuerySpec),
cancellationToken: default);
this.changeFeedQuerySpec));
TryCatch<CrossPartitionChangeFeedAsyncEnumerator> monadicEnumerator = TryCatch<CrossPartitionChangeFeedAsyncEnumerator>.FromResult(enumerator);
return monadicEnumerator;
Expand Down Expand Up @@ -274,11 +273,10 @@ private async Task<ResponseMessage> ReadNextInternalAsync(ITrace trace, Cancella
}

CrossPartitionChangeFeedAsyncEnumerator enumerator = monadicEnumerator.Result;
enumerator.SetCancellationToken(cancellationToken);

try
{
if (!await enumerator.MoveNextAsync(trace))
if (!await enumerator.MoveNextAsync(trace, cancellationToken))
{
throw new InvalidOperationException("ChangeFeed enumerator should always have a next continuation");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ internal sealed class ChangeFeedNotModifiedPage : ChangeFeedPage
private static readonly ImmutableHashSet<string> bannedHeaders = new HashSet<string>().ToImmutableHashSet();

public ChangeFeedNotModifiedPage(
double requestCharge,
double requestCharge,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
}

}

public override int ItemCount => 0;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ internal sealed class ChangeFeedPartitionRangePageAsyncEnumerator : PartitionRan
public ChangeFeedPartitionRangePageAsyncEnumerator(
IChangeFeedDataSource changeFeedDataSource,
FeedRangeState<ChangeFeedState> feedRangeState,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken)
: base(feedRangeState, cancellationToken)
ChangeFeedPaginationOptions changeFeedPaginationOptions)
: base(feedRangeState)
{
this.changeFeedDataSource = changeFeedDataSource ?? throw new ArgumentNullException(nameof(changeFeedDataSource));
this.changeFeedPaginationOptions = changeFeedPaginationOptions ?? throw new ArgumentNullException(nameof(changeFeedPaginationOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ internal sealed class ChangeFeedSuccessPage : ChangeFeedPage

public ChangeFeedSuccessPage(
Stream content,
double requestCharge,
double requestCharge,
int itemCount,
string activityId,
IReadOnlyDictionary<string, string> additionalHeaders,
ChangeFeedState state)
: base(requestCharge, activityId, additionalHeaders, state)
{
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.Content = content ?? throw new ArgumentNullException(nameof(content));
this.ItemCount = itemCount;
}

public Stream Content { get; }

public Stream Content { get; }

public override int ItemCount { get; }

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,24 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Pagination
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Tracing;

internal sealed class CrossPartitionChangeFeedAsyncEnumerator : IAsyncEnumerator<TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>>
internal sealed class CrossPartitionChangeFeedAsyncEnumerator : ITracingAsyncEnumerator<TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>>
{
private readonly CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator;
private CancellationToken cancellationToken;
private TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>>? bufferedException;

private CrossPartitionChangeFeedAsyncEnumerator(
CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator,
CancellationToken cancellationToken)
CrossPartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> crossPartitionEnumerator)
{
this.crossPartitionEnumerator = crossPartitionEnumerator ?? throw new ArgumentNullException(nameof(crossPartitionEnumerator));
this.cancellationToken = cancellationToken;
}

public TryCatch<CrossFeedRangePage<ChangeFeedPage, ChangeFeedState>> Current { get; private set; }

public ValueTask DisposeAsync() => this.crossPartitionEnumerator.DisposeAsync();

public ValueTask<bool> MoveNextAsync()
{
return this.MoveNextAsync(NoOpTrace.Singleton);
}

public async ValueTask<bool> MoveNextAsync(ITrace trace)
public async ValueTask<bool> MoveNextAsync(ITrace trace, CancellationToken cancellationToken)
{
this.cancellationToken.ThrowIfCancellationRequested();
cancellationToken.ThrowIfCancellationRequested();

if (trace == null)
{
Expand All @@ -53,7 +45,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
return true;
}

if (!await this.crossPartitionEnumerator.MoveNextAsync(changeFeedMoveNextTrace))
if (!await this.crossPartitionEnumerator.MoveNextAsync(changeFeedMoveNextTrace, cancellationToken))
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
}
Expand All @@ -80,7 +72,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
double totalRequestCharge = backendPage.RequestCharge;
do
{
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages))
if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages, cancellationToken))
{
throw new InvalidOperationException("ChangeFeed should always have a next page.");
}
Expand All @@ -107,15 +99,16 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
{
backendPage = new ChangeFeedSuccessPage(
changeFeedSuccessPage.Content,
totalRequestCharge,
totalRequestCharge,
changeFeedSuccessPage.ItemCount,
changeFeedSuccessPage.ActivityId,
changeFeedSuccessPage.AdditionalHeaders,
changeFeedSuccessPage.State);
}
else
{
backendPage = new ChangeFeedNotModifiedPage(
totalRequestCharge,
totalRequestCharge,
backendPage.ActivityId,
backendPage.AdditionalHeaders,
backendPage.State);
Expand All @@ -133,17 +126,10 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
}
}

public void SetCancellationToken(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
this.crossPartitionEnumerator.SetCancellationToken(cancellationToken);
}

public static CrossPartitionChangeFeedAsyncEnumerator Create(
IDocumentContainer documentContainer,
CrossFeedRangeState<ChangeFeedState> state,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken)
ChangeFeedPaginationOptions changeFeedPaginationOptions)
{
changeFeedPaginationOptions ??= ChangeFeedPaginationOptions.Default;

Expand All @@ -156,17 +142,14 @@ public static CrossPartitionChangeFeedAsyncEnumerator Create(
documentContainer,
CrossPartitionChangeFeedAsyncEnumerator.MakeCreateFunction(
documentContainer,
changeFeedPaginationOptions,
cancellationToken),
changeFeedPaginationOptions),
comparer: default /* this uses a regular queue instead of priority queue */,
maxConcurrency: default,
prefetchPolicy: PrefetchPolicy.PrefetchSinglePage,
cancellationToken: cancellationToken,
state: state);

CrossPartitionChangeFeedAsyncEnumerator enumerator = new CrossPartitionChangeFeedAsyncEnumerator(
crossPartitionEnumerator,
cancellationToken);
crossPartitionEnumerator);

return enumerator;
}
Expand All @@ -181,11 +164,9 @@ private static bool IsNextRangeEqualToOriginal(

private static CreatePartitionRangePageAsyncEnumerator<ChangeFeedPage, ChangeFeedState> MakeCreateFunction(
IChangeFeedDataSource changeFeedDataSource,
ChangeFeedPaginationOptions changeFeedPaginationOptions,
CancellationToken cancellationToken) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
ChangeFeedPaginationOptions changeFeedPaginationOptions) => (FeedRangeState<ChangeFeedState> feedRangeState) => new ChangeFeedPartitionRangePageAsyncEnumerator(
changeFeedDataSource,
feedRangeState,
changeFeedPaginationOptions,
cancellationToken);
changeFeedPaginationOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@ internal sealed class BufferedPartitionRangePageAsyncEnumerator<TPage, TState> :
where TState : State
{
private readonly PartitionRangePageAsyncEnumerator<TPage, TState> enumerator;
private TryCatch<TPage>? bufferedPage;
private TryCatch<TPage>? bufferedPage;

public override Exception BufferedException
{
get
{
if (this.bufferedPage.HasValue && this.bufferedPage.Value.Failed)
{
return this.bufferedPage.Value.Exception;
}

return null;
}
}

public override int BufferedItemCount => this.bufferedPage.HasValue && this.bufferedPage.Value.Succeeded ?
this.bufferedPage.Value.Result.ItemCount :
0;

public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator, CancellationToken cancellationToken)
: base(enumerator.FeedRangeState, cancellationToken)
public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator)
: base(enumerator.FeedRangeState)
{
this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
}
Expand Down Expand Up @@ -59,15 +76,9 @@ public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken ca

using (ITrace prefetchTrace = trace.StartChild("Prefetch", TraceComponent.Pagination, TraceLevel.Info))
{
await this.enumerator.MoveNextAsync(prefetchTrace);
await this.enumerator.MoveNextAsync(prefetchTrace, cancellationToken);
this.bufferedPage = this.enumerator.Current;
}
}

public override void SetCancellationToken(CancellationToken cancellationToken)
{
base.SetCancellationToken(cancellationToken);
this.enumerator.SetCancellationToken(cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Pagination
{
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Tracing;
Expand All @@ -12,10 +13,14 @@ internal abstract class BufferedPartitionRangePageAsyncEnumeratorBase<TPage, TSt
where TPage : Page<TState>
where TState : State
{
protected BufferedPartitionRangePageAsyncEnumeratorBase(FeedRangeState<TState> feedRangeState, CancellationToken cancellationToken)
: base(feedRangeState, cancellationToken)
protected BufferedPartitionRangePageAsyncEnumeratorBase(FeedRangeState<TState> feedRangeState)
: base(feedRangeState)
{
}
}

public abstract Exception BufferedException { get; }

public abstract int BufferedItemCount { get; }

public abstract ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken);
}
Expand Down
6 changes: 4 additions & 2 deletions Microsoft.Azure.Cosmos/src/Pagination/CrossFeedRangePage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ public CrossFeedRangePage(TBackendPage backendEndPage, CrossFeedRangeState<TBack
this.Page = backendEndPage;
}

public TBackendPage Page { get; }

public TBackendPage Page { get; }

public override int ItemCount => this.Page.ItemCount;

protected override ImmutableHashSet<string> DerivedClassBannedHeaders => bannedHeaders;
}
}
Loading

0 comments on commit 5788326

Please sign in to comment.