Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal Query: Adds adoption of pagination lib #1812

Merged
merged 107 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 105 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
86a0dc4
rename
bchong95 Jun 23, 2020
567b8ff
drafted out abstract classes
bchong95 Jun 23, 2020
1ad7e9c
drafted out basic in memory collection
bchong95 Jun 23, 2020
3e8f9f6
got basic crud test working
bchong95 Jun 23, 2020
2f3fef0
implemented split and readfeed
bchong95 Jun 23, 2020
f229619
switching tracks to try asyncenumerable
bchong95 Jun 24, 2020
66f5db0
started playing with async enumerator
bchong95 Jun 25, 2020
157617d
added full IAsyncEnumerable interface
bchong95 Jun 25, 2020
4ce4a13
made the single partition range enumerator more plug and play
bchong95 Jun 25, 2020
4e62c7f
got in memory partition range enumerator working
bchong95 Jun 26, 2020
79e3d32
cleaned up code
bchong95 Jun 26, 2020
fddb47f
got cross partition working
bchong95 Jun 26, 2020
1a618c6
resolved iteration comments
bchong95 Jun 26, 2020
3ec6951
merged
bchong95 Jun 26, 2020
3bf1fa4
started this query integration work, but need to make minor edits
bchong95 Jun 29, 2020
4aba7b7
Merge branch 'master' into users/brchon/ItemProducerRefactor
bchong95 Jun 29, 2020
7ab5e11
made the continuation token part of the page
bchong95 Jun 29, 2020
595f6ad
added 429 tests
bchong95 Jun 29, 2020
0b4a808
refactored tests to have more common code
bchong95 Jun 30, 2020
2e1b827
added an empty page test
bchong95 Jun 30, 2020
d34aa41
drafted out QueryPartitionRangePageEnumerator
bchong95 Jun 30, 2020
b099bd0
added basic query support
bchong95 Jun 30, 2020
254f3f8
made code templatized
bchong95 Jul 1, 2020
ffcefdb
got query partition provider working
bchong95 Jul 1, 2020
c7909b9
drafted all parallel query enumerator
bchong95 Jul 1, 2020
51fb6a8
got aggregates working
bchong95 Jul 2, 2020
647af75
got skip working
bchong95 Jul 2, 2020
4b599b5
adding take stage
bchong95 Jul 2, 2020
0fd47fd
added distinct
bchong95 Jul 3, 2020
ca4b86f
got group by working
bchong95 Jul 3, 2020
2e40aee
got the pipeline working
bchong95 Jul 5, 2020
12e8849
wired through query stages
bchong95 Jul 7, 2020
b050521
started parallel cpq debugging
bchong95 Jul 8, 2020
84e907b
not putting back finished enumerators
bchong95 Jul 8, 2020
51dc129
need to stop using feedrange
bchong95 Jul 9, 2020
c62e1f3
moved to pkrange instead of feed range
bchong95 Jul 9, 2020
797a761
fixed baseline
bchong95 Jul 9, 2020
e283503
merged in partition key range
bchong95 Jul 10, 2020
bb85500
got basic cases working
bchong95 Jul 10, 2020
27afdbc
got continuation token support working
bchong95 Jul 10, 2020
33080a0
simplified query draining with continuation token path
bchong95 Jul 10, 2020
7fbb306
resolved iteration comment by adding DocumentContainer as an abstract…
bchong95 Jul 13, 2020
34a9139
stopped returning 429 for get partition key ranges, since there isn't…
bchong95 Jul 14, 2020
6365de5
updated tests
bchong95 Jul 14, 2020
2d630f4
merged
bchong95 Jul 14, 2020
6737a04
added document container interface
bchong95 Jul 14, 2020
ba3259a
merged
bchong95 Jul 14, 2020
da3fb94
added end to end tests
bchong95 Jul 15, 2020
c8a5cfa
got basic query working end to end
bchong95 Jul 15, 2020
61ed005
composition over inheritance
bchong95 Jul 18, 2020
5adb713
merge
bchong95 Jul 20, 2020
464d5cc
started order by, but need the first PR to go through. To many merge …
bchong95 Jul 20, 2020
6c12d62
Merge branch 'master' into users/brchon/ItemProducerRefactor
bchong95 Jul 24, 2020
965bdf3
fix build errors
bchong95 Jul 24, 2020
c41d5c1
merged
bchong95 Jul 25, 2020
3aafdc2
merged
bchong95 Jul 25, 2020
bfed619
need to make serious changes for order by
bchong95 Jul 29, 2020
da5d2d0
Revert "Internal Query : Fix Remove Antlr dependency for now (#1626)"
bchong95 Jul 30, 2020
48a9c7c
added test for antlr dependancy
bchong95 Jul 30, 2020
a8b8ef2
Merge branch 'master' into revert-1626-users/brchon/Query/DisableAntlr
bchong95 Jul 30, 2020
a0ba53c
Merge branch 'master' into revert-1626-users/brchon/Query/DisableAntlr
bchong95 Jul 31, 2020
5006be3
need to wire up the item enumerator to order by
bchong95 Jul 31, 2020
e0f8a07
need to refactor for failures inside of the filter logic
bchong95 Aug 3, 2020
c699205
drafted out a solution without splits and continuation token
bchong95 Aug 3, 2020
c299f7d
got basic order by working
bchong95 Aug 3, 2020
5fa628b
Merge branch 'master' into revert-1626-users/brchon/Query/DisableAntlr
bchong95 Aug 5, 2020
9c37663
need to get the parser involved for this one
bchong95 Aug 5, 2020
8ec4ce2
added monadic parser
bchong95 Aug 5, 2020
18dc83c
whoops
bchong95 Aug 6, 2020
5baf322
merged
bchong95 Aug 6, 2020
eaeab62
hooked up parser and offline engine to the inmemorycontainer
bchong95 Aug 6, 2020
555c9d6
got continuation token support working
bchong95 Aug 8, 2020
c63264d
moved around some files
bchong95 Aug 8, 2020
a75df5d
removed old execution component code
bchong95 Aug 8, 2020
c980953
moved continuation tokens around
bchong95 Aug 8, 2020
c4953c6
got basic order by working end to end
bchong95 Aug 8, 2020
243128c
need to handle splits
bchong95 Aug 27, 2020
1633dc9
merged
bchong95 Aug 27, 2020
f93cdf0
got split working for order by
bchong95 Aug 28, 2020
3097cce
refactored to be a state machine
bchong95 Aug 28, 2020
ee74df3
fixed bug where partiton key ranges where matched to a continuation d…
bchong95 Aug 29, 2020
ba264d0
added parallel buffering
bchong95 Sep 1, 2020
ff1d0b3
merged
bchong95 Sep 26, 2020
7bec2f3
resolved iteration comments
bchong95 Sep 26, 2020
e905d16
fixed some non baseline tests
bchong95 Sep 26, 2020
c0fec4e
updated baselines
bchong95 Sep 26, 2020
7746b89
honoring ranges and compressed continuation tokens for parallel
bchong95 Sep 26, 2020
fa4ef5a
wired through cancellation token
bchong95 Sep 27, 2020
14b2f01
fixed random integration bugs
bchong95 Sep 29, 2020
2ba7a5a
fixed some test cases
bchong95 Sep 30, 2020
78c2f98
fixed test
bchong95 Sep 30, 2020
0ebb26c
fixed page size bugs with group by
bchong95 Oct 2, 2020
c12323e
removed unused collections
bchong95 Oct 2, 2020
645f3e4
opting for simple resource id:
bchong95 Oct 3, 2020
6f9c2af
fixed bug with JOINs + ORDER BY + Continuation tokens
bchong95 Oct 3, 2020
eba6d91
left some comments
bchong95 Oct 3, 2020
ef50dbe
merged
bchong95 Oct 3, 2020
181e0af
fixed bugs
bchong95 Oct 4, 2020
0cdb136
added split tests for parallel and patched bugs in the inmemorycontainer
bchong95 Oct 4, 2020
88f7ca2
fixed more tests
bchong95 Oct 4, 2020
f679971
setting feed iterator flag for non retriable exception
bchong95 Oct 4, 2020
1596a71
apparently we don't actually handle name cache is stale exceptions
bchong95 Oct 4, 2020
01f4dd6
Merge branch 'master' into users/brchon/Query/UsePaginationLib
bchong95 Oct 5, 2020
cc09d26
Merge branch 'master' into users/brchon/Query/UsePaginationLib
bchong95 Oct 5, 2020
f069c81
dummy commit
bchong95 Oct 5, 2020
0bd58c8
Merge branch 'master' into users/brchon/Query/UsePaginationLib
bchong95 Oct 6, 2020
58577ba
Merge branch 'master' into users/brchon/Query/UsePaginationLib
j82w Oct 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Routing;
using Newtonsoft.Json;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Routing;

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/ClientRetryPolicy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public async Task<ShouldRetryResult> ShouldRetryAsync(

this.retryContext = null;
// Received Connection error (HttpRequestException), initiate the endpoint rediscovery
if (exception is HttpRequestException)
if (exception is HttpRequestException httpException)
{
DefaultTrace.TraceWarning("Endpoint not reachable. Refresh cache and retry");
return await this.ShouldRetryOnEndpointFailureAsync(this.isReadRequest, false);
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Cosmos/src/CosmosElements/CosmosArray.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Microsoft.Azure.Cosmos.CosmosElements
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent.Distinct;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct;

#if INTERNAL
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Microsoft.Azure.Cosmos.CosmosElements
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionComponent.Distinct;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Distinct;

#if INTERNAL
#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Microsoft.Azure.Cosmos
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Newtonsoft.Json;

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ public override Task<TResult> AcceptAsync<TResult>(
return visitor.VisitAsync(this, cancellationToken);
}

public override string ToString()
{
return this.Range.ToString();
}
public override Task<TResult> AcceptAsync<TResult, TArg>(
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
IFeedRangeAsyncVisitor<TResult, TArg> visitor,
TArg argument,
CancellationToken cancellationToken) => visitor.VisitAsync(this, argument, cancellationToken);

public override string ToString() => this.Range.ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(

public abstract Task<TResult> AcceptAsync<TResult>(IFeedRangeAsyncVisitor<TResult> visitor, CancellationToken cancellationToken = default);

public abstract Task<TResult> AcceptAsync<TResult, TArg>(
IFeedRangeAsyncVisitor<TResult, TArg> visitor,
TArg argument,
CancellationToken cancellationToken);

public abstract override string ToString();

public override string ToJsonString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ public override Task<TResult> AcceptAsync<TResult>(
return visitor.VisitAsync(this, cancellationToken);
}

public override string ToString()
{
return this.PartitionKey.InternalKey.ToJsonString();
}
public override Task<TResult> AcceptAsync<TResult, TArg>(
IFeedRangeAsyncVisitor<TResult, TArg> visitor,
TArg argument,
CancellationToken cancellationToken) => visitor.VisitAsync(this, argument, cancellationToken);

public override string ToString() => this.PartitionKey.InternalKey.ToJsonString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ public override Task<TResult> AcceptAsync<TResult>(
return visitor.VisitAsync(this, cancellationToken);
}

public override string ToString()
{
return this.PartitionKeyRangeId;
}
public override Task<TResult> AcceptAsync<TResult, TArg>(
IFeedRangeAsyncVisitor<TResult, TArg> visitor,
TArg argument,
CancellationToken cancellationToken) => visitor.VisitAsync(this, argument, cancellationToken);

public override string ToString() => this.PartitionKeyRangeId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System.Threading;
using System.Threading.Tasks;

internal interface IFeedRangeAsyncVisitor<TResult, TArg>
{
public abstract Task<TResult> VisitAsync(FeedRangePartitionKey feedRange, TArg argument, CancellationToken cancellationToken);

public abstract Task<TResult> VisitAsync(FeedRangePartitionKeyRange feedRange, TArg argument, CancellationToken cancellationToken);

public abstract Task<TResult> VisitAsync(FeedRangeEpk feedRange, TArg argument, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.Azure.Cosmos.Handlers
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Routing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ internal CosmosQueryResponseMessageHeaders CloneKnownProperties(
internal static CosmosQueryResponseMessageHeaders ConvertToQueryHeaders(
Headers sourceHeaders,
ResourceType resourceType,
string containerRid)
string containerRid,
int? substatusCode = null,
string activityId = null)
{
if (sourceHeaders == null)
{
Expand All @@ -98,11 +100,11 @@ internal static CosmosQueryResponseMessageHeaders ConvertToQueryHeaders(
{
RequestCharge = sourceHeaders.RequestCharge,
ContentLength = sourceHeaders.ContentLength,
ActivityId = sourceHeaders.ActivityId,
ActivityId = sourceHeaders.ActivityId ?? activityId,
ETag = sourceHeaders.ETag,
Location = sourceHeaders.Location,
RetryAfterLiteral = sourceHeaders.RetryAfterLiteral,
SubStatusCodeLiteral = sourceHeaders.SubStatusCodeLiteral,
SubStatusCodeLiteral = sourceHeaders.SubStatusCodeLiteral ?? (substatusCode.HasValue ? substatusCode.Value.ToString() : null),
ContentType = sourceHeaders.ContentType,
QueryMetricsText = sourceHeaders.QueryMetricsText
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Pagination
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.Query.Core.Monads;

internal sealed class BufferedPartitionRangePageAsyncEnumerator<TPage, TState> : PartitionRangePageAsyncEnumerator<TPage, TState>, IPrefetcher
where TPage : Page<TState>
where TState : State
{
private readonly PartitionRangePageAsyncEnumerator<TPage, TState> enumerator;
private TryCatch<TPage>? bufferedPage;

public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerator<TPage, TState> enumerator, CancellationToken cancellationToken)
: base(enumerator.Range, cancellationToken, enumerator.State)
{
this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator));
}

public override ValueTask DisposeAsync() => this.enumerator.DisposeAsync();

protected override async Task<TryCatch<TPage>> GetNextPageAsync(CancellationToken cancellationToken)
{
await this.PrefetchAsync(cancellationToken);

// Serve from the buffered page first.
TryCatch<TPage> returnValue = this.bufferedPage.Value;
this.bufferedPage = null;
return returnValue;
}

public async ValueTask PrefetchAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

if (this.bufferedPage.HasValue)
{
return;
}

await this.enumerator.MoveNextAsync();
this.bufferedPage = this.enumerator.Current;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ internal sealed class CrossPartitionRangePageAsyncEnumerable<TPage, TState> : IA
private readonly CreatePartitionRangePageAsyncEnumerator<TPage, TState> createPartitionRangeEnumerator;
private readonly IComparer<PartitionRangePageAsyncEnumerator<TPage, TState>> comparer;
private readonly IFeedRangeProvider feedRangeProvider;
private readonly int maxConcurrency;

public CrossPartitionRangePageAsyncEnumerable(
IFeedRangeProvider feedRangeProvider,
CreatePartitionRangePageAsyncEnumerator<TPage, TState> createPartitionRangeEnumerator,
IComparer<PartitionRangePageAsyncEnumerator<TPage, TState>> comparer,
int maxConcurrency,
CrossPartitionState<TState> state = default)
{
this.feedRangeProvider = feedRangeProvider ?? throw new ArgumentNullException(nameof(comparer));
this.createPartitionRangeEnumerator = createPartitionRangeEnumerator ?? throw new ArgumentNullException(nameof(createPartitionRangeEnumerator));
this.comparer = comparer ?? throw new ArgumentNullException(nameof(comparer));
this.state = state;
this.maxConcurrency = maxConcurrency < 0 ? throw new ArgumentOutOfRangeException(nameof(maxConcurrency)) : maxConcurrency;
}

public IAsyncEnumerator<TryCatch<CrossPartitionPage<TPage, TState>>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
Expand All @@ -38,6 +41,8 @@ public IAsyncEnumerator<TryCatch<CrossPartitionPage<TPage, TState>>> GetAsyncEnu
this.feedRangeProvider,
this.createPartitionRangeEnumerator,
this.comparer,
this.maxConcurrency,
cancellationToken,
this.state);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Microsoft.Azure.Cosmos.Pagination
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -24,21 +25,25 @@ internal sealed class CrossPartitionRangePageAsyncEnumerator<TPage, TState> : IA
private readonly IFeedRangeProvider feedRangeProvider;
private readonly CreatePartitionRangePageAsyncEnumerator<TPage, TState> createPartitionRangeEnumerator;
private readonly AsyncLazy<PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>>> lazyEnumerators;
private CancellationToken cancellationToken;

public CrossPartitionRangePageAsyncEnumerator(
IFeedRangeProvider feedRangeProvider,
CreatePartitionRangePageAsyncEnumerator<TPage, TState> createPartitionRangeEnumerator,
IComparer<PartitionRangePageAsyncEnumerator<TPage, TState>> comparer,
int? maxConcurrency,
CancellationToken cancellationToken,
CrossPartitionState<TState> state = default)
{
this.feedRangeProvider = feedRangeProvider ?? throw new ArgumentNullException(nameof(feedRangeProvider));
this.createPartitionRangeEnumerator = createPartitionRangeEnumerator ?? throw new ArgumentNullException(nameof(createPartitionRangeEnumerator));

if (comparer == null)
{
throw new ArgumentNullException(nameof(comparer));
}

this.feedRangeProvider = feedRangeProvider ?? throw new ArgumentNullException(nameof(feedRangeProvider));
this.createPartitionRangeEnumerator = createPartitionRangeEnumerator ?? throw new ArgumentNullException(nameof(createPartitionRangeEnumerator));
this.cancellationToken = cancellationToken;

this.lazyEnumerators = new AsyncLazy<PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>>>(async (CancellationToken token) =>
{
IReadOnlyList<(PartitionKeyRange, TState)> rangeAndStates;
Expand All @@ -60,13 +65,23 @@ public CrossPartitionRangePageAsyncEnumerator(
rangeAndStates = rangesAndStatesBuilder;
}

PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators = new PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>>(comparer);
foreach ((PartitionKeyRange range, TState rangeState) in rangeAndStates)
List<BufferedPartitionRangePageAsyncEnumerator<TPage, TState>> bufferedEnumerators = rangeAndStates
.Select(rangeAndState =>
{
PartitionRangePageAsyncEnumerator<TPage, TState> enumerator = createPartitionRangeEnumerator(rangeAndState.Item1, rangeAndState.Item2);
BufferedPartitionRangePageAsyncEnumerator<TPage, TState> bufferedEnumerator = new BufferedPartitionRangePageAsyncEnumerator<TPage, TState>(enumerator, cancellationToken);
return bufferedEnumerator;
})
.ToList();

if (maxConcurrency.HasValue)
{
PartitionRangePageAsyncEnumerator<TPage, TState> enumerator = createPartitionRangeEnumerator(range, rangeState);
enumerators.Enqueue(enumerator);
await ParallelPrefetch.PrefetchInParallelAsync(bufferedEnumerators, maxConcurrency.Value, token);
}

PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators = new PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>>(
bufferedEnumerators,
comparer);
return enumerators;
});
}
Expand All @@ -75,7 +90,9 @@ public CrossPartitionRangePageAsyncEnumerator(

public async ValueTask<bool> MoveNextAsync()
{
PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators = await this.lazyEnumerators.GetValueAsync(cancellationToken: default);
this.cancellationToken.ThrowIfCancellationRequested();

PriorityQueue<PartitionRangePageAsyncEnumerator<TPage, TState>> enumerators = await this.lazyEnumerators.GetValueAsync(cancellationToken: this.cancellationToken);
if (enumerators.Count == 0)
{
return false;
Expand Down Expand Up @@ -103,7 +120,7 @@ public async ValueTask<bool> MoveNextAsync()
// Handle split
IEnumerable<PartitionKeyRange> childRanges = await this.feedRangeProvider.GetChildRangeAsync(
currentPaginator.Range,
cancellationToken: default);
cancellationToken: this.cancellationToken);
foreach (PartitionKeyRange childRange in childRanges)
{
PartitionRangePageAsyncEnumerator<TPage, TState> childPaginator = this.createPartitionRangeEnumerator(
Expand All @@ -120,18 +137,18 @@ public async ValueTask<bool> MoveNextAsync()
{
throw new NotImplementedException();
}
}

if (currentPaginator.State != null)
{
// Just enqueue the paginator and the user can decide if they want to retry.
enumerators.Enqueue(currentPaginator);

this.Current = TryCatch<CrossPartitionPage<TPage, TState>>.FromException(currentPaginator.Current.Exception);
return true;
}

TryCatch<TPage> backendPage = currentPaginator.Current;
if (backendPage.Failed)
if (currentPaginator.State != default)
{
this.Current = TryCatch<CrossPartitionPage<TPage, TState>>.FromException(backendPage.Exception);
return true;
// Don't enqueue the paginator otherwise it's an infinite loop.
enumerators.Enqueue(currentPaginator);
}

CrossPartitionState<TState> crossPartitionState;
Expand All @@ -151,7 +168,7 @@ public async ValueTask<bool> MoveNextAsync()
}

this.Current = TryCatch<CrossPartitionPage<TPage, TState>>.FromResult(
new CrossPartitionPage<TPage, TState>(backendPage.Result, crossPartitionState));
new CrossPartitionPage<TPage, TState>(currentPaginator.Current.Result, crossPartitionState));
return true;
}

Expand All @@ -161,6 +178,11 @@ public ValueTask DisposeAsync()
return default;
}

public void SetCancellationToken(CancellationToken cancellationToken)
{
this.cancellationToken = cancellationToken;
}

private static bool IsSplitException(Exception exeception)
{
return exeception is CosmosException cosmosException
Expand Down
Loading