From e64a70e3816b193c11dec8e199aa7fc3d55c3197 Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Mon, 2 May 2022 17:00:05 -0700 Subject: [PATCH 1/8] Add an aggressive prefetch policy and a fully buffered partition range page enumerator to match --- ...CrossPartitionChangeFeedAsyncEnumerator.cs | 7 +- ...fferedPartitionRangePageAsyncEnumerator.cs | 21 +-- ...edPartitionRangePageAsyncEnumeratorBase.cs | 22 +++ .../CrossPartitionRangePageAsyncEnumerator.cs | 135 +++++++++++------- ...lyBufferedPartitionRangeAsyncEnumerator.cs | 97 +++++++++++++ .../src/Pagination/PrefetchPolicy.cs | 11 ++ .../CosmosQueryExecutionContextFactory.cs | 1 + ...arallelCrossPartitionQueryPipelineStage.cs | 8 +- .../Query/Core/Pipeline/PipelineFactory.cs | 15 ++ .../CrossPartitionReadFeedAsyncEnumerator.cs | 6 +- ...sPartitionPartitionRangeEnumeratorTests.cs | 4 + .../CrossPartitionRangePageAsyncEnumerable.cs | 4 + ...elCrossPartitionQueryPipelineStageTests.cs | 35 +++-- 13 files changed, 290 insertions(+), 76 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumeratorBase.cs create mode 100644 Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs create mode 100644 Microsoft.Azure.Cosmos/src/Pagination/PrefetchPolicy.cs diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs index b04d8ba792..a359a9baaf 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs @@ -158,10 +158,11 @@ public static CrossPartitionChangeFeedAsyncEnumerator Create( documentContainer, changeFeedPaginationOptions, cancellationToken), - comparer: default /* this uses a regular queue instead of prioirty queue */, + comparer: default /* this uses a regular queue instead of priority queue */, maxConcurrency: default, - cancellationToken, - state); + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, + cancellationToken: cancellationToken, + state: state); CrossPartitionChangeFeedAsyncEnumerator enumerator = new CrossPartitionChangeFeedAsyncEnumerator( crossPartitionEnumerator, diff --git a/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumerator.cs index 37fbc82aec..de689618f6 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumerator.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.Cosmos.Pagination using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Tracing; - internal sealed class BufferedPartitionRangePageAsyncEnumerator : PartitionRangePageAsyncEnumerator, IPrefetcher + internal sealed class BufferedPartitionRangePageAsyncEnumerator : BufferedPartitionRangePageAsyncEnumeratorBase where TPage : Page where TState : State { @@ -23,7 +23,10 @@ public BufferedPartitionRangePageAsyncEnumerator(PartitionRangePageAsyncEnumerat this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); } - public override ValueTask DisposeAsync() => this.enumerator.DisposeAsync(); + public override ValueTask DisposeAsync() + { + return this.enumerator.DisposeAsync(); + } protected override async Task> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken) { @@ -40,20 +43,22 @@ protected override async Task> GetNextPageAsync(ITrace trace, Ca return returnValue; } - public async ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken) + public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken) { if (trace == null) { throw new ArgumentNullException(nameof(trace)); } - using (ITrace prefetchTrace = trace.StartChild("Prefetch", TraceComponent.Pagination, TraceLevel.Info)) + if (this.bufferedPage.HasValue) { - if (this.bufferedPage.HasValue) - { - return; - } + return; + } + cancellationToken.ThrowIfCancellationRequested(); + + using (ITrace prefetchTrace = trace.StartChild("Prefetch", TraceComponent.Pagination, TraceLevel.Info)) + { await this.enumerator.MoveNextAsync(prefetchTrace); this.bufferedPage = this.enumerator.Current; } diff --git a/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumeratorBase.cs b/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumeratorBase.cs new file mode 100644 index 0000000000..67af30c353 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Pagination/BufferedPartitionRangePageAsyncEnumeratorBase.cs @@ -0,0 +1,22 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Pagination +{ + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Tracing; + + internal abstract class BufferedPartitionRangePageAsyncEnumeratorBase : PartitionRangePageAsyncEnumerator, IPrefetcher + where TPage : Page + where TState : State + { + protected BufferedPartitionRangePageAsyncEnumeratorBase(FeedRangeState feedRangeState, CancellationToken cancellationToken) + : base(feedRangeState, cancellationToken) + { + } + + public abstract ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken); + } +} diff --git a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs index 97135a6201..603a8a97ef 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs @@ -34,6 +34,7 @@ public CrossPartitionRangePageAsyncEnumerator( CreatePartitionRangePageAsyncEnumerator createPartitionRangeEnumerator, IComparer> comparer, int? maxConcurrency, + PrefetchPolicy prefetchPolicy, CancellationToken cancellationToken, CrossFeedRangeState state = default) { @@ -41,57 +42,16 @@ public CrossPartitionRangePageAsyncEnumerator( this.createPartitionRangeEnumerator = createPartitionRangeEnumerator ?? throw new ArgumentNullException(nameof(createPartitionRangeEnumerator)); this.cancellationToken = cancellationToken; - this.lazyEnumerators = new AsyncLazy>>(async (ITrace trace, CancellationToken token) => - { - ReadOnlyMemory> rangeAndStates; - if (state != default) - { - rangeAndStates = state.Value; - } - else - { - // Fan out to all partitions with default state - List ranges = await feedRangeProvider.GetFeedRangesAsync(trace, token); - - List> rangesAndStatesBuilder = new List>(ranges.Count); - foreach (FeedRangeInternal range in ranges) - { - rangesAndStatesBuilder.Add(new FeedRangeState(range, default)); - } - - rangeAndStates = rangesAndStatesBuilder.ToArray(); - } - - List> bufferedEnumerators = new List>(rangeAndStates.Length); - for (int i = 0; i < rangeAndStates.Length; i++) - { - FeedRangeState feedRangeState = rangeAndStates.Span[i]; - PartitionRangePageAsyncEnumerator enumerator = createPartitionRangeEnumerator(feedRangeState); - BufferedPartitionRangePageAsyncEnumerator bufferedEnumerator = new BufferedPartitionRangePageAsyncEnumerator(enumerator, cancellationToken); - bufferedEnumerators.Add(bufferedEnumerator); - } - - if (maxConcurrency.HasValue) - { - await ParallelPrefetch.PrefetchInParallelAsync(bufferedEnumerators, maxConcurrency.Value, trace, token); - } - - IQueue> queue; - if (comparer == null) - { - queue = new QueueWrapper>( - new Queue>(bufferedEnumerators)); - } - else - { - queue = new PriorityQueueWrapper>( - new PriorityQueue>( - bufferedEnumerators, - comparer)); - } - - return queue; - }); + this.lazyEnumerators = new AsyncLazy>>((ITrace trace, CancellationToken token) => + InitializeEnumeratorsAsync( + feedRangeProvider, + createPartitionRangeEnumerator, + comparer, + maxConcurrency, + prefetchPolicy, + state, + trace, + token)); } public TryCatch> Current { get; private set; } @@ -281,6 +241,79 @@ private static bool IsSplitException(Exception exeception) return enumerators.Peek()?.FeedRangeState; } + private static async Task>> InitializeEnumeratorsAsync( + IFeedRangeProvider feedRangeProvider, + CreatePartitionRangePageAsyncEnumerator createPartitionRangeEnumerator, + IComparer> comparer, + int? maxConcurrency, + PrefetchPolicy prefetchPolicy, + CrossFeedRangeState state, + ITrace trace, + CancellationToken token) + { + ReadOnlyMemory> rangeAndStates; + if (state != default) + { + rangeAndStates = state.Value; + } + else + { + // Fan out to all partitions with default state + List ranges = await feedRangeProvider.GetFeedRangesAsync(trace, token); + + List> rangesAndStatesBuilder = new List>(ranges.Count); + foreach (FeedRangeInternal range in ranges) + { + rangesAndStatesBuilder.Add(new FeedRangeState(range, default)); + } + + rangeAndStates = rangesAndStatesBuilder.ToArray(); + } + + IReadOnlyList> bufferedEnumerators = CreateBufferedEnumerators( + prefetchPolicy, + createPartitionRangeEnumerator, + rangeAndStates, + token); + + if (maxConcurrency.HasValue) + { + await ParallelPrefetch.PrefetchInParallelAsync(bufferedEnumerators, maxConcurrency.Value, trace, token); + } + + IQueue> queue = comparer == null + ? new QueueWrapper>( + new Queue>(bufferedEnumerators)) + : new PriorityQueueWrapper>( + new PriorityQueue>( + bufferedEnumerators, + comparer)); + return queue; + } + + private static IReadOnlyList> CreateBufferedEnumerators( + PrefetchPolicy policy, + CreatePartitionRangePageAsyncEnumerator createPartitionRangeEnumerator, + ReadOnlyMemory> rangeAndStates, + CancellationToken cancellationToken) + { + List> bufferedEnumerators = new (rangeAndStates.Length); + for (int i = 0; i < rangeAndStates.Length; i++) + { + FeedRangeState feedRangeState = rangeAndStates.Span[i]; + PartitionRangePageAsyncEnumerator enumerator = createPartitionRangeEnumerator(feedRangeState); + BufferedPartitionRangePageAsyncEnumeratorBase bufferedEnumerator = policy switch + { + PrefetchPolicy.PrefetchSinglePage => new BufferedPartitionRangePageAsyncEnumerator(enumerator, cancellationToken), + PrefetchPolicy.PrefetchAll => new BufferedPartitionRangePageAsyncEnumerator(enumerator, cancellationToken), + _ => throw new ArgumentOutOfRangeException(nameof(policy)), + }; + bufferedEnumerators.Add(bufferedEnumerator); + } + + return bufferedEnumerators; + } + private interface IQueue : IEnumerable { T Peek(); diff --git a/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs new file mode 100644 index 0000000000..8fe9ec6855 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs @@ -0,0 +1,97 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Pagination +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Tracing; + + internal sealed class FullyBufferedPartitionRangeAsyncEnumerator : BufferedPartitionRangePageAsyncEnumeratorBase + where TPage : Page + where TState : State + { + private readonly PartitionRangePageAsyncEnumerator enumerator; + private readonly List bufferedPages; + private int index; + private Exception exception; + + private bool HasPrefetched => this.exception != null || this.bufferedPages.Count > 0; + + public FullyBufferedPartitionRangeAsyncEnumerator(PartitionRangePageAsyncEnumerator enumerator, CancellationToken cancellationToken) + : base(enumerator.FeedRangeState, cancellationToken) + { + this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); + this.bufferedPages = new List(); + this.index = 0; + this.exception = null; + } + + public override ValueTask DisposeAsync() + { + return this.enumerator.DisposeAsync(); + } + + public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken cancellationToken) + { + if (trace == null) + { + throw new ArgumentNullException(nameof(trace)); + } + + if (this.HasPrefetched) + { + return; + } + + cancellationToken.ThrowIfCancellationRequested(); + + using (ITrace prefetchTrace = trace.StartChild("Prefetch", TraceComponent.Pagination, TraceLevel.Info)) + { + while (this.exception == null && await this.enumerator.MoveNextAsync(prefetchTrace)) + { + cancellationToken.ThrowIfCancellationRequested(); + TryCatch current = this.enumerator.Current; + if (current.Succeeded) + { + this.bufferedPages.Add(current.Result); + } + else + { + this.exception = current.Exception; + } + } + } + } + + protected override async Task> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken) + { + TryCatch result; + if (this.index < this.bufferedPages.Count) + { + result = TryCatch.FromResult(this.bufferedPages[this.index]); + } + else if (this.index == this.bufferedPages.Count && this.exception != null) + { + result = TryCatch.FromException(this.exception); + } + else + { + await this.enumerator.MoveNextAsync(trace); + result = this.enumerator.Current; + } + + ++this.index; + return result; + } + + public override void SetCancellationToken(CancellationToken cancellationToken) + { + base.SetCancellationToken(cancellationToken); + this.enumerator.SetCancellationToken(cancellationToken); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Pagination/PrefetchPolicy.cs b/Microsoft.Azure.Cosmos/src/Pagination/PrefetchPolicy.cs new file mode 100644 index 0000000000..4f541b81e2 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Pagination/PrefetchPolicy.cs @@ -0,0 +1,11 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ +namespace Microsoft.Azure.Cosmos.Pagination +{ + internal enum PrefetchPolicy + { + PrefetchSinglePage, + PrefetchAll + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs index 3d411f01e6..a389fd5da6 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CosmosQueryExecutionContextFactory.cs @@ -379,6 +379,7 @@ private static TryCatch TryCreatePassthroughQueryExecutionC queryPaginationOptions: new QueryPaginationOptions( pageSizeHint: inputParameters.MaxItemCount), partitionKey: inputParameters.PartitionKey, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, maxConcurrency: inputParameters.MaxConcurrency, cancellationToken: cancellationToken, continuationToken: inputParameters.InitialUserContinuationToken); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/ParallelCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/ParallelCrossPartitionQueryPipelineStage.cs index 9bf4f1727d..c9ccb31653 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/ParallelCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/Parallel/ParallelCrossPartitionQueryPipelineStage.cs @@ -137,6 +137,7 @@ public static TryCatch MonadicCreate( Cosmos.PartitionKey? partitionKey, QueryPaginationOptions queryPaginationOptions, int maxConcurrency, + PrefetchPolicy prefetchPolicy, CosmosElement continuationToken, CancellationToken cancellationToken) { @@ -159,10 +160,11 @@ public static TryCatch MonadicCreate( CrossFeedRangeState state = monadicExtractState.Result; CrossPartitionRangePageAsyncEnumerator crossPartitionPageEnumerator = new CrossPartitionRangePageAsyncEnumerator( - documentContainer, - ParallelCrossPartitionQueryPipelineStage.MakeCreateFunction(documentContainer, sqlQuerySpec, queryPaginationOptions, partitionKey, cancellationToken), + feedRangeProvider: documentContainer, + createPartitionRangeEnumerator: ParallelCrossPartitionQueryPipelineStage.MakeCreateFunction(documentContainer, sqlQuerySpec, queryPaginationOptions, partitionKey, cancellationToken), comparer: Comparer.Singleton, - maxConcurrency, + maxConcurrency: maxConcurrency, + prefetchPolicy: prefetchPolicy, state: state, cancellationToken: cancellationToken); diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs index 3029990645..8a720bdb13 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs @@ -63,6 +63,8 @@ public static TryCatch MonadicCreate( sqlQuerySpec = !string.IsNullOrEmpty(queryInfo.RewrittenQuery) ? new SqlQuerySpec(queryInfo.RewrittenQuery, sqlQuerySpec.Parameters) : sqlQuerySpec; + PrefetchPolicy prefetchPolicy = DeterminePrefetchPolicy(queryInfo); + MonadicCreatePipelineStage monadicCreatePipelineStage; if (queryInfo.HasOrderBy) { @@ -87,6 +89,7 @@ public static TryCatch MonadicCreate( targetRanges: targetRanges, queryPaginationOptions: queryPaginationOptions, partitionKey: partitionKey, + prefetchPolicy: prefetchPolicy, maxConcurrency: maxConcurrency, continuationToken: continuationToken, cancellationToken: cancellationToken); @@ -178,5 +181,17 @@ public static TryCatch MonadicCreate( return monadicCreatePipelineStage(requestContinuationToken, requestCancellationToken) .Try(onSuccess: (stage) => new SkipEmptyPageQueryPipelineStage(stage, requestCancellationToken)); } + + private static PrefetchPolicy DeterminePrefetchPolicy(QueryInfo queryInfo) + { + if (!queryInfo.HasDCount && queryInfo.HasAggregates && !queryInfo.HasGroupBy) + { + return PrefetchPolicy.PrefetchAll; + } + else + { + return PrefetchPolicy.PrefetchSinglePage; + } + } } } diff --git a/Microsoft.Azure.Cosmos/src/ReadFeed/Pagination/CrossPartitionReadFeedAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/ReadFeed/Pagination/CrossPartitionReadFeedAsyncEnumerator.cs index d87d1bbdf1..5641318ec3 100644 --- a/Microsoft.Azure.Cosmos/src/ReadFeed/Pagination/CrossPartitionReadFeedAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/ReadFeed/Pagination/CrossPartitionReadFeedAsyncEnumerator.cs @@ -104,6 +104,7 @@ public static CrossPartitionReadFeedAsyncEnumerator Create( cancellationToken), comparer: comparer, maxConcurrency: default, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken, crossFeedRangeState); @@ -116,11 +117,14 @@ public static CrossPartitionReadFeedAsyncEnumerator Create( private static CreatePartitionRangePageAsyncEnumerator MakeCreateFunction( IReadFeedDataSource readFeedDataSource, ReadFeedPaginationOptions readFeedPaginationOptions, - CancellationToken cancellationToken) => (FeedRangeState feedRangeState) => new ReadFeedPartitionRangeEnumerator( + CancellationToken cancellationToken) + { + return (FeedRangeState feedRangeState) => new ReadFeedPartitionRangeEnumerator( readFeedDataSource, feedRangeState, readFeedPaginationOptions, cancellationToken); + } private sealed class PartitionRangePageAsyncEnumeratorComparerForward : IComparer> { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs index 17273542b9..7a7ace5f0f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs @@ -79,6 +79,7 @@ PartitionRangePageAsyncEnumerator createEnumerator( createPartitionRangeEnumerator: createEnumerator, comparer: null, maxConcurrency: 0, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, state: new CrossFeedRangeState( new FeedRangeState[] @@ -132,6 +133,7 @@ PartitionRangePageAsyncEnumerator createEnumerator( feedRangeProvider: feedRangeProvider.Object, createPartitionRangeEnumerator: createEnumerator, comparer: null, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, maxConcurrency: 0, cancellationToken: default, state: new CrossFeedRangeState( @@ -377,6 +379,7 @@ PartitionRangePageAsyncEnumerator createEnumerator( createPartitionRangeEnumerator: createEnumerator, comparer: PartitionRangePageAsyncEnumeratorComparer.Singleton, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, trace: NoOpTrace.Singleton, state: state ?? new CrossFeedRangeState( new FeedRangeState[] @@ -403,6 +406,7 @@ PartitionRangePageAsyncEnumerator createEnumerator( createPartitionRangeEnumerator: createEnumerator, comparer: PartitionRangePageAsyncEnumeratorComparer.Singleton, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: cancellationToken, state: state ?? new CrossFeedRangeState( new FeedRangeState[] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/CrossPartitionRangePageAsyncEnumerable.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/CrossPartitionRangePageAsyncEnumerable.cs index 4b8b4f868c..7b96480a5f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/CrossPartitionRangePageAsyncEnumerable.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/CrossPartitionRangePageAsyncEnumerable.cs @@ -21,6 +21,7 @@ internal sealed class CrossPartitionRangePageAsyncEnumerable : IA private readonly IComparer> comparer; private readonly IFeedRangeProvider feedRangeProvider; private readonly int maxConcurrency; + private readonly PrefetchPolicy prefetchPolicy; private readonly ITrace trace; public CrossPartitionRangePageAsyncEnumerable( @@ -28,6 +29,7 @@ public CrossPartitionRangePageAsyncEnumerable( CreatePartitionRangePageAsyncEnumerator createPartitionRangeEnumerator, IComparer> comparer, int maxConcurrency, + PrefetchPolicy prefetchPolicy, ITrace trace, CrossFeedRangeState state = default) { @@ -36,6 +38,7 @@ public CrossPartitionRangePageAsyncEnumerable( this.comparer = comparer ?? throw new ArgumentNullException(nameof(comparer)); this.state = state; this.maxConcurrency = maxConcurrency < 0 ? throw new ArgumentOutOfRangeException(nameof(maxConcurrency)) : maxConcurrency; + this.prefetchPolicy = prefetchPolicy; this.trace = trace ?? throw new ArgumentNullException(nameof(trace)); } @@ -49,6 +52,7 @@ public IAsyncEnumerator>> GetAsyncEnu this.createPartitionRangeEnumerator, this.comparer, this.maxConcurrency, + this.prefetchPolicy, cancellationToken, this.state), this.trace); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/ParallelCrossPartitionQueryPipelineStageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/ParallelCrossPartitionQueryPipelineStageTests.cs index 40bef384e5..e772996748 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/ParallelCrossPartitionQueryPipelineStageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/ParallelCrossPartitionQueryPipelineStageTests.cs @@ -37,6 +37,7 @@ public void MonadicCreate_NullContinuationToken() queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: null); Assert.IsTrue(monadicCreate.Succeeded); @@ -54,6 +55,7 @@ public void MonadicCreate_NonCosmosArrayContinuationToken() queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: CosmosObject.Create(new Dictionary())); Assert.IsTrue(monadicCreate.Failed); @@ -72,6 +74,7 @@ public void MonadicCreate_EmptyArrayContinuationToken() queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: CosmosArray.Create(new List())); Assert.IsTrue(monadicCreate.Failed); @@ -90,6 +93,7 @@ public void MonadicCreate_NonParallelContinuationToken() queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: CosmosArray.Create(new List() { CosmosString.Create("asdf") })); Assert.IsTrue(monadicCreate.Failed); @@ -112,6 +116,7 @@ public void MonadicCreate_SingleParallelContinuationToken() queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: CosmosArray.Create(new List() { ParallelContinuationToken.ToCosmosElement(token) })); Assert.IsTrue(monadicCreate.Succeeded); @@ -141,6 +146,7 @@ public void MonadicCreate_MultipleParallelContinuationToken() queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: CosmosArray.Create( new List() @@ -152,17 +158,25 @@ public void MonadicCreate_MultipleParallelContinuationToken() } [TestMethod] - [DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")] - [DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")] - [DataRow(false, true, false, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: false")] - [DataRow(false, true, true, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: true")] - [DataRow(true, false, false, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: false")] - [DataRow(true, false, true, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: true")] - [DataRow(true, true, false, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: false")] - [DataRow(true, true, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")] - public async Task TestDrainWithStateSplitsAndMergeAsync(bool useState, bool allowSplits, bool allowMerges) + [DataRow(false, false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")] + [DataRow(false, false, true, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")] + [DataRow(false, true, false, false, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: false")] + [DataRow(false, true, true, false, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: true")] + [DataRow(true, false, false, false, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: false")] + [DataRow(true, false, true, false, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: true")] + [DataRow(true, true, false, false, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: false")] + [DataRow(true, true, true, false, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")] + [DataRow(false, false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")] + [DataRow(false, false, true, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")] + [DataRow(false, true, false, true, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: false")] + [DataRow(false, true, true, true, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: true")] + [DataRow(true, false, false, true, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: false")] + [DataRow(true, false, true, true, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: true")] + [DataRow(true, true, false, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: false")] + [DataRow(true, true, true, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")] + public async Task TestDrainWithStateSplitsAndMergeAsync(bool useState, bool allowSplits, bool allowMerges, bool aggressivePrefetch) { - static async Task CreatePipelineStateAsync(IDocumentContainer documentContainer, CosmosElement continuationToken) + async Task CreatePipelineStateAsync(IDocumentContainer documentContainer, CosmosElement continuationToken) { TryCatch monadicQueryPipelineStage = ParallelCrossPartitionQueryPipelineStage.MonadicCreate( documentContainer: documentContainer, @@ -173,6 +187,7 @@ static async Task CreatePipelineStateAsync(IDocumentContain queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), partitionKey: null, maxConcurrency: 10, + prefetchPolicy: aggressivePrefetch ? PrefetchPolicy.PrefetchAll : PrefetchPolicy.PrefetchSinglePage, cancellationToken: default, continuationToken: continuationToken); Assert.IsTrue(monadicQueryPipelineStage.Succeeded); From 302cc46f873c6b6aa21e940ebe40a4547fdff23a Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Mon, 2 May 2022 17:54:00 -0700 Subject: [PATCH 2/8] Start setting up test scaffolding --- .../Pagination/BufferedPartitionRangeEnumeratorTests.cs | 2 +- .../CrossPartitionPartitionRangeEnumeratorTests.cs | 6 +++--- .../Pagination/PartitionRangeEnumeratorTests.cs | 4 +++- .../SinglePartitionPartitionRangeEnumeratorTests.cs | 2 +- .../OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs | 2 +- .../Query/QueryPartitionRangePageEnumeratorTests.cs | 2 +- 6 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs index 03dfc9a53c..f15d49cc86 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs @@ -203,7 +203,7 @@ public override IReadOnlyList GetRecordsFromPage(ReadFeedPage page) return page.GetRecords(); } - public override IAsyncEnumerable> CreateEnumerable( + protected override IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, ReadFeedState state = null) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs index 7a7ace5f0f..3545eb0495 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs @@ -218,7 +218,7 @@ protected override Task> GetNextPageAsync(ITrace trace, C public async Task TestSplitAndMergeAsync(bool useState, bool allowSplits, bool allowMerges) { Implementation implementation = new Implementation(singlePartition: false); - await implementation.TestSplitAndMergeImplementationAsync(useState, allowSplits, allowMerges); + await implementation.TestSplitAndMergeImplementationAsync(useState, allowSplits, allowMerges, false); } private sealed class Implementation : PartitionRangeEnumeratorTests, CrossFeedRangeState> @@ -301,7 +301,7 @@ public async Task TestMergeToSinglePartition() Assert.AreEqual(numItems, identifiers.Count); } - public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allowSplits, bool allowMerges) + public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allowSplits, bool allowMerges, bool aggressivePrefetch) { int numItems = 1000; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); @@ -363,7 +363,7 @@ public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allow Assert.AreEqual(numItems, identifiers.Count); } - public override IAsyncEnumerable>> CreateEnumerable( + protected override IAsyncEnumerable>> CreateEnumerable( IDocumentContainer inMemoryCollection, CrossFeedRangeState state = null) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs index ca6872b531..74ecb101c5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs @@ -163,7 +163,9 @@ public async Task TestEmptyPages() public abstract IReadOnlyList GetRecordsFromPage(TPage page); - public abstract IAsyncEnumerable> CreateEnumerable(IDocumentContainer documentContainer, TState state = null); + protected abstract IAsyncEnumerable> CreateEnumerable( + IDocumentContainer documentContainer, + TState state = null); public abstract IAsyncEnumerator> CreateEnumerator(IDocumentContainer documentContainer, TState state = null, CancellationToken cancellationToken= default); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs index 48c0621b7a..2c3e1b3212 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs @@ -122,7 +122,7 @@ public override IReadOnlyList GetRecordsFromPage(ReadFeedPage page) return page.GetRecords(); } - public override IAsyncEnumerable> CreateEnumerable( + protected override IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, ReadFeedState state = null) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs index ad9abcd36f..2055f748d2 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs @@ -36,7 +36,7 @@ public override IReadOnlyList GetRecordsFromPage(OrderByQueryPage page) throw new NotImplementedException(); } - public override IAsyncEnumerable> CreateEnumerable(IDocumentContainer documentContainer, QueryState state = null) + protected override IAsyncEnumerable> CreateEnumerable(IDocumentContainer documentContainer, QueryState state = null) { throw new NotImplementedException(); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs index 2f37cbd681..b161d1cc50 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs @@ -127,7 +127,7 @@ public override IReadOnlyList GetRecordsFromPage(QueryPage page) return records; } - public override IAsyncEnumerable> CreateEnumerable( + protected override IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, QueryState state = null) { From c23334340cdfee00f060e34f166a9862f20415c5 Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Tue, 3 May 2022 12:46:04 -0700 Subject: [PATCH 3/8] Fix up some broken unit tests --- .../TraceWriterBaselineTests.ScenariosAsync.xml | 11 +---------- .../Pagination/PartitionRangeEnumeratorTests.cs | 2 +- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml index b1935cd1b2..e1ffdb393d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml @@ -1471,7 +1471,6 @@ │ │ └── [,FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component 12:00:00:000 0.00 milliseconds │ │ └── Query Transport(00000000-0000-0000-0000-000000000000) Transport-Component 12:00:00:000 0.00 milliseconds │ ├── [,FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component 12:00:00:000 0.00 milliseconds - │ │ └── Prefetch(00000000-0000-0000-0000-000000000000) Pagination-Component 12:00:00:000 0.00 milliseconds │ ├── Get Child Ranges(00000000-0000-0000-0000-000000000000) Routing-Component 12:00:00:000 0.00 milliseconds │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component 12:00:00:000 0.00 milliseconds │ └── [,1F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component 12:00:00:000 0.00 milliseconds @@ -1589,15 +1588,7 @@ "name": "[,FF) move next", "id": "00000000-0000-0000-0000-000000000000", "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Prefetch", - "id": "00000000-0000-0000-0000-000000000000", - "start time": "12:00:00:000", - "duration in milliseconds": 0 - } - ] + "duration in milliseconds": 0 }, { "name": "Get Child Ranges", diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs index 74ecb101c5..b6e8b5bc4e 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs @@ -29,7 +29,7 @@ public async Task TestMoveNextAsyncThrowsTaskCanceledException() CancellationTokenSource cts = new CancellationTokenSource(); IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection, cancellationToken: cts.Token); cts.Cancel(); - await Assert.ThrowsExceptionAsync(async () => await enumerator.MoveNextAsync()); + await Assert.ThrowsExceptionAsync(async () => await enumerator.MoveNextAsync()); } [TestMethod] From 373ee7ad56ac144b1009aaa3fe42bf1034da8a0b Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Wed, 4 May 2022 11:22:06 -0700 Subject: [PATCH 4/8] Instantiate the correct type of buffered enumerator in cross partition enumerator --- .../CrossPartitionRangePageAsyncEnumerator.cs | 2 +- .../Query/Pipeline/FullPipelineTests.cs | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs index 603a8a97ef..a56a21d593 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs @@ -305,7 +305,7 @@ private static IReadOnlyList bufferedEnumerator = policy switch { PrefetchPolicy.PrefetchSinglePage => new BufferedPartitionRangePageAsyncEnumerator(enumerator, cancellationToken), - PrefetchPolicy.PrefetchAll => new BufferedPartitionRangePageAsyncEnumerator(enumerator, cancellationToken), + PrefetchPolicy.PrefetchAll => new FullyBufferedPartitionRangeAsyncEnumerator(enumerator, cancellationToken), _ => throw new ArgumentOutOfRangeException(nameof(policy)), }; bufferedEnumerators.Add(bufferedEnumerator); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs index 79fef002a8..d3507c7f4b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs @@ -162,8 +162,9 @@ public async Task OffsetLimit() [TestMethod] public async Task Aggregates() { + const int DocumentCount = 250; List documents = new List(); - for (int i = 0; i < 250; i++) + for (int i = 0; i < DocumentCount; i++) { documents.Add(CosmosObject.Parse($"{{\"pk\" : {i} }}")); } @@ -173,6 +174,14 @@ public async Task Aggregates() documents: documents); Assert.AreEqual(expected: 1, actual: documentsQueried.Count); + if (documentsQueried[0] is CosmosNumber number) + { + Assert.AreEqual(expected: DocumentCount, actual: Number64.ToLong(number.Value)); + } + else + { + Assert.Fail(); + } } [TestMethod] From 7511b96c1683f085c4d6349dad097f7fc46d2ac0 Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Wed, 4 May 2022 23:14:10 -0700 Subject: [PATCH 5/8] add some UTs for aggressive prefetching --- .../Pagination/InMemoryContainer.cs | 4 +- .../AggressivePrefetchPipelineTests.cs | 201 ++++++++++++++++++ .../Query/Pipeline/FullPipelineTests.cs | 22 +- 3 files changed, 221 insertions(+), 6 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs index c057331e83..c14edc02e5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs @@ -33,7 +33,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Pagination using ResourceIdentifier = Cosmos.Pagination.ResourceIdentifier; // Collection useful for mocking requests and repartitioning (splits / merge). - internal sealed class InMemoryContainer : IMonadicDocumentContainer + internal class InMemoryContainer : IMonadicDocumentContainer { private readonly PartitionKeyDefinition partitionKeyDefinition; private readonly Dictionary parentToChildMapping; @@ -449,7 +449,7 @@ public Task> MonadicReadFeedAsync( } } - public Task> MonadicQueryAsync( + public virtual Task> MonadicQueryAsync( SqlQuerySpec sqlQuerySpec, FeedRangeState feedRangeState, QueryPaginationOptions queryPaginationOptions, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs new file mode 100644 index 0000000000..4e30eba614 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs @@ -0,0 +1,201 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using FluentAssertions; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Pagination; + using Microsoft.Azure.Cosmos.Query.Core; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Tests.Pagination; + using Microsoft.Azure.Cosmos.Tracing; + using Microsoft.Azure.Documents; + using Microsoft.VisualStudio.TestTools.UnitTesting; + + [TestClass] + public class AggressivePrefetchPipelineTests + { + public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(3); + + private static readonly TimeSpan PollingInterval = TimeSpan.FromMilliseconds(25); + + [TestMethod] + [Owner("ndeshpan")] + public async Task BasicTests() + { + + } + + private AggressivePrefetchTestCase MakeTest(string query, IReadOnlyList documents, MonadicQueryDelegate queryDelegate, CosmosElement expectedDocument) + { + return new AggressivePrefetchTestCase(query, documents, queryDelegate, expectedDocument); + } + + private struct AggressivePrefetchTestCase + { + public string Query { get; } + + public IReadOnlyList Documents { get; } + + public int CountPartitions { get; } + + public MonadicQueryDelegate QueryDelegate { get; } + + public CosmosElement ExpectedDocument { get; } + + public AggressivePrefetchTestCase( + string query, + IReadOnlyList documents, + int countPartitions, + MonadicQueryDelegate queryDelegate, + CosmosElement expectedDocument) + { + this.Query = query ?? throw new ArgumentNullException(nameof(query)); + this.Documents = documents ?? throw new ArgumentNullException(nameof(documents)); + this.CountPartitions = countPartitions; + this.QueryDelegate = queryDelegate ?? throw new ArgumentNullException(nameof(queryDelegate)); + this.ExpectedDocument = expectedDocument ?? throw new ArgumentNullException(nameof(expectedDocument)); + } + } + + private static async Task RunTest(AggressivePrefetchTestCase testCase) + { + CancellationTokenSource cts = new CancellationTokenSource(Timeout); + + MockQueryDataSource queryDataSource = new MockQueryDataSource( + testCase.QueryDelegate, + testCase.CountPartitions, + cts.Token); + + IMonadicDocumentContainer monadicDocumentContainer = new MockDocumentContainer( + FullPipelineTests.partitionKeyDefinition, + queryDataSource); + + IDocumentContainer documentContainer = await FullPipelineTests.CreateDocumentContainerAsync( + testCase.Documents, + monadicDocumentContainer, + testCase.CountPartitions); + + Task> resultTask = FullPipelineTests.ExecuteQueryAsync( + testCase.Query, + testCase.Documents, + documentContainer); + + while (queryDataSource.CountWaiters < testCase.CountPartitions && !cts.IsCancellationRequested) + { + await Task.Delay(PollingInterval); + } + + queryDataSource.Release(testCase.CountPartitions); + + IReadOnlyList actualDocuments = await resultTask; + actualDocuments.Should().HaveCount(1); + actualDocuments.First().Should().Be(testCase.ExpectedDocument); + } + + private delegate Task> MonadicQueryDelegate( + SqlQuerySpec sqlQuerySpec, + FeedRangeState feedRangeState, + QueryPaginationOptions queryPaginationOptions, + ITrace trace, + CancellationToken cancellationToken); + + private sealed class MockQueryDataSource : IMonadicQueryDataSource, IDisposable + { + private readonly SemaphoreSlim semaphore; + + private readonly CancellationToken cancellationToken; + + private readonly int maxConcurrency; + + private readonly int continuationCount; + + private bool disposedValue; + + public MonadicQueryDelegate QueryDelegate { get; } + + public int CountWaiters => this.maxConcurrency - this.semaphore.CurrentCount; + + public MockQueryDataSource( + MonadicQueryDelegate queryDelegate, + int continuationCount, + int maxConcurrency, + CancellationToken cancellationToken) + { + this.continuationCount = continuationCount; + this.maxConcurrency = maxConcurrency; + this.semaphore = new SemaphoreSlim(0, maxConcurrency); + this.cancellationToken = cancellationToken; + this.QueryDelegate = queryDelegate ?? throw new ArgumentNullException(nameof(queryDelegate)); + } + + public void Release(int count) + { + this.semaphore.Release(count); + } + + public async Task> MonadicQueryAsync( + SqlQuerySpec sqlQuerySpec, + FeedRangeState feedRangeState, + QueryPaginationOptions queryPaginationOptions, + ITrace trace, + CancellationToken cancellationToken) + { + await this.semaphore.WaitAsync(this.cancellationToken); + + Number64 continuationToken = feedRangeState.State.Value as Number64; + + return await this.QueryDelegate(sqlQuerySpec, feedRangeState, queryPaginationOptions, trace, cancellationToken); + } + + private void Dispose(bool disposing) + { + if (!this.disposedValue) + { + if (disposing) + { + this.semaphore.Dispose(); + } + + this.disposedValue = true; + } + } + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + this.Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } + + private sealed class MockDocumentContainer : InMemoryContainer + { + private readonly IMonadicQueryDataSource queryDataSource; + + public MockDocumentContainer(PartitionKeyDefinition partitionKeyDefinition, IMonadicQueryDataSource queryDataSource) + : base(partitionKeyDefinition) + { + this.queryDataSource = queryDataSource ?? throw new ArgumentNullException(nameof(queryDataSource)); + } + + public override Task> MonadicQueryAsync( + SqlQuerySpec sqlQuerySpec, + FeedRangeState feedRangeState, + QueryPaginationOptions queryPaginationOptions, + ITrace trace, + CancellationToken cancellationToken) + { + return this.queryDataSource.MonadicQueryAsync(sqlQuerySpec, feedRangeState, queryPaginationOptions, trace, cancellationToken); + } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs index d3507c7f4b..9f63ec45c7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs @@ -25,7 +25,7 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline [TestClass] public class FullPipelineTests { - private static readonly PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition() + internal static readonly PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition() { Paths = new Collection() { @@ -250,7 +250,7 @@ public async Task Tracing() Assert.AreEqual(numTraces, rootTrace.Children.Count); } - private static async Task> ExecuteQueryAsync( + internal static async Task> ExecuteQueryAsync( string query, IReadOnlyList documents, IDocumentContainer documentContainer = null, @@ -327,10 +327,16 @@ private static async Task> DrainWithStateAsync(string query, return elements; } - private static async Task CreateDocumentContainerAsync( + internal static Task CreateDocumentContainerAsync( IReadOnlyList documents, int numPartitions = 3, FlakyDocumentContainer.FailureConfigs failureConfigs = null) + { + IMonadicDocumentContainer monadicDocumentContainer = CreateMonadicDocumentContainerAsync(failureConfigs); + return CreateDocumentContainerAsync(documents, monadicDocumentContainer, numPartitions); + } + + internal static IMonadicDocumentContainer CreateMonadicDocumentContainerAsync(FlakyDocumentContainer.FailureConfigs failureConfigs) { IMonadicDocumentContainer monadicDocumentContainer = new InMemoryContainer(partitionKeyDefinition); if (failureConfigs != null) @@ -338,6 +344,14 @@ private static async Task CreateDocumentContainerAsync( monadicDocumentContainer = new FlakyDocumentContainer(monadicDocumentContainer, failureConfigs); } + return monadicDocumentContainer; + } + + internal static async Task CreateDocumentContainerAsync( + IReadOnlyList documents, + IMonadicDocumentContainer monadicDocumentContainer, + int numPartitions) + { DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer); for (int i = 0; i < numPartitions; i++) @@ -373,7 +387,7 @@ private static async Task CreateDocumentContainerAsync( private static async Task CreatePipelineAsync( IDocumentContainer documentContainer, string query, - int pageSize = 10, + int pageSize, CosmosElement state = null) { IReadOnlyList feedRanges = await documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default); From a95b45bdaa6a5d841e5aba488f03216e1b4419b5 Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Thu, 5 May 2022 11:53:00 -0700 Subject: [PATCH 6/8] Add unit tests for aggressive prefetching --- .../AggressivePrefetchPipelineTests.cs | 195 +++++++++++------- .../Query/Pipeline/FullPipelineTests.cs | 24 ++- 2 files changed, 135 insertions(+), 84 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs index 4e30eba614..a62a4420c6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs @@ -11,10 +11,12 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline using System.Threading.Tasks; using FluentAssertions; using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.CosmosElements.Numbers; using Microsoft.Azure.Cosmos.Pagination; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Query.Core.Pipeline.Pagination; + using Microsoft.Azure.Cosmos.Query.Core.QueryClient; using Microsoft.Azure.Cosmos.Tests.Pagination; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.Azure.Documents; @@ -23,45 +25,79 @@ namespace Microsoft.Azure.Cosmos.Tests.Query.Pipeline [TestClass] public class AggressivePrefetchPipelineTests { - public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(3); + public static readonly TimeSpan Timeout = TimeSpan.FromSeconds(5); + + private const int DocumentCount = 500; private static readonly TimeSpan PollingInterval = TimeSpan.FromMilliseconds(25); + private static readonly IReadOnlyList Documents = Enumerable + .Range(1, DocumentCount) + .Select(x => CosmosObject.Parse($"{{\"pk\" : {x} }}")) + .ToList(); + [TestMethod] [Owner("ndeshpan")] public async Task BasicTests() { - + AggressivePrefetchTestCase[] testCases = new[] + { + MakeTest( + query: "SELECT VALUE COUNT(1) FROM c", + continuationCount: 3, + partitionCount: 3, + expectedDocument: CosmosNumber64.Create(DocumentCount)), + MakeTest( + query: "SELECT VALUE MAX(c.pk) FROM c", + continuationCount: 3, + partitionCount: 3, + expectedDocument: CosmosNumber64.Create(DocumentCount)), + MakeTest( + query: "SELECT VALUE MIN(c.pk) FROM c", + continuationCount: 3, + partitionCount: 3, + expectedDocument: CosmosNumber64.Create(1)), + MakeTest( + query: "SELECT VALUE SUM(1) FROM c", + continuationCount: 3, + partitionCount: 3, + expectedDocument: CosmosNumber64.Create(DocumentCount)) + }; + + foreach(AggressivePrefetchTestCase testCase in testCases) + { + await RunTest(testCase); + } } - private AggressivePrefetchTestCase MakeTest(string query, IReadOnlyList documents, MonadicQueryDelegate queryDelegate, CosmosElement expectedDocument) + private static AggressivePrefetchTestCase MakeTest(string query, int continuationCount, int partitionCount, CosmosElement expectedDocument) { - return new AggressivePrefetchTestCase(query, documents, queryDelegate, expectedDocument); + return new AggressivePrefetchTestCase( + query: query, + continuationCount: continuationCount, + partitionCount: partitionCount, + expectedDocument: expectedDocument); } private struct AggressivePrefetchTestCase { public string Query { get; } - public IReadOnlyList Documents { get; } + public int ContinuationCount { get; } - public int CountPartitions { get; } - - public MonadicQueryDelegate QueryDelegate { get; } + public int PartitionCount { get; } public CosmosElement ExpectedDocument { get; } public AggressivePrefetchTestCase( string query, - IReadOnlyList documents, - int countPartitions, - MonadicQueryDelegate queryDelegate, + int continuationCount, + int partitionCount, CosmosElement expectedDocument) { this.Query = query ?? throw new ArgumentNullException(nameof(query)); - this.Documents = documents ?? throw new ArgumentNullException(nameof(documents)); - this.CountPartitions = countPartitions; - this.QueryDelegate = queryDelegate ?? throw new ArgumentNullException(nameof(queryDelegate)); + this.ContinuationCount = continuationCount; + this.PartitionCount = partitionCount; this.ExpectedDocument = expectedDocument ?? throw new ArgumentNullException(nameof(expectedDocument)); } } @@ -70,45 +106,43 @@ private static async Task RunTest(AggressivePrefetchTestCase testCase) { CancellationTokenSource cts = new CancellationTokenSource(Timeout); - MockQueryDataSource queryDataSource = new MockQueryDataSource( - testCase.QueryDelegate, - testCase.CountPartitions, - cts.Token); + int maxConcurrency = Convert.ToInt32(Math.Pow(2, testCase.PartitionCount)); - IMonadicDocumentContainer monadicDocumentContainer = new MockDocumentContainer( - FullPipelineTests.partitionKeyDefinition, - queryDataSource); + using MockDocumentContainer monadicDocumentContainer = new MockDocumentContainer( + partitionKeyDefinition: FullPipelineTests.partitionKeyDefinition, + continuationCount: testCase.ContinuationCount, + maxConcurrency: maxConcurrency, + cancellationToken: cts.Token); IDocumentContainer documentContainer = await FullPipelineTests.CreateDocumentContainerAsync( - testCase.Documents, + Documents, monadicDocumentContainer, - testCase.CountPartitions); + testCase.PartitionCount); + + IReadOnlyList feedRanges = await documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cts.Token); + Assert.AreEqual(maxConcurrency, feedRanges.Count); - Task> resultTask = FullPipelineTests.ExecuteQueryAsync( + Task> resultTask = FullPipelineTests.DrainWithoutStateAsync( testCase.Query, - testCase.Documents, - documentContainer); + documentContainer, + pageSize: 10); - while (queryDataSource.CountWaiters < testCase.CountPartitions && !cts.IsCancellationRequested) + for (int i = 0; i < testCase.ContinuationCount; i++) { - await Task.Delay(PollingInterval); - } + while (monadicDocumentContainer.CountWaiters < maxConcurrency && !cts.IsCancellationRequested) + { + await Task.Delay(PollingInterval); + } - queryDataSource.Release(testCase.CountPartitions); + monadicDocumentContainer.Release(maxConcurrency); + } IReadOnlyList actualDocuments = await resultTask; actualDocuments.Should().HaveCount(1); actualDocuments.First().Should().Be(testCase.ExpectedDocument); } - private delegate Task> MonadicQueryDelegate( - SqlQuerySpec sqlQuerySpec, - FeedRangeState feedRangeState, - QueryPaginationOptions queryPaginationOptions, - ITrace trace, - CancellationToken cancellationToken); - - private sealed class MockQueryDataSource : IMonadicQueryDataSource, IDisposable + private sealed class MockDocumentContainer : InMemoryContainer, IDisposable { private readonly SemaphoreSlim semaphore; @@ -120,29 +154,21 @@ private sealed class MockQueryDataSource : IMonadicQueryDataSource, IDisposable private bool disposedValue; - public MonadicQueryDelegate QueryDelegate { get; } - public int CountWaiters => this.maxConcurrency - this.semaphore.CurrentCount; - public MockQueryDataSource( - MonadicQueryDelegate queryDelegate, + public MockDocumentContainer( + PartitionKeyDefinition partitionKeyDefinition, int continuationCount, int maxConcurrency, CancellationToken cancellationToken) + : base(partitionKeyDefinition) { this.continuationCount = continuationCount; this.maxConcurrency = maxConcurrency; this.semaphore = new SemaphoreSlim(0, maxConcurrency); this.cancellationToken = cancellationToken; - this.QueryDelegate = queryDelegate ?? throw new ArgumentNullException(nameof(queryDelegate)); - } - - public void Release(int count) - { - this.semaphore.Release(count); } - - public async Task> MonadicQueryAsync( + public override async Task> MonadicQueryAsync( SqlQuerySpec sqlQuerySpec, FeedRangeState feedRangeState, QueryPaginationOptions queryPaginationOptions, @@ -151,9 +177,53 @@ public async Task> MonadicQueryAsync( { await this.semaphore.WaitAsync(this.cancellationToken); - Number64 continuationToken = feedRangeState.State.Value as Number64; + int count = ParseQueryState(feedRangeState.State); + + QueryState continuationToken = count < this.continuationCount ? CreateQueryState(++count) : default; + QueryPage page = new QueryPage( + documents: new List { }, + requestCharge: 3.0, + activityId: "E7980B1F-436E-44DF-B7A5-655C56D38648", + responseLengthInBytes: 48, + cosmosQueryExecutionInfo: new Lazy(() => new CosmosQueryExecutionInfo(false, false)), + disallowContinuationTokenMessage: null, + additionalHeaders: null, + state: continuationToken); + + return continuationToken != default ? + TryCatch.FromResult(page) : + await base.MonadicQueryAsync( + sqlQuerySpec, + new FeedRangeState(feedRangeState.FeedRange, default), + queryPaginationOptions, + trace, + cancellationToken); + } + + public void Release(int count) + { + this.semaphore.Release(count); + } + + private static int ParseQueryState(QueryState state) + { + if (state == default) return 0; - return await this.QueryDelegate(sqlQuerySpec, feedRangeState, queryPaginationOptions, trace, cancellationToken); + CosmosObject parsedContinuationToken = CosmosObject.Parse(((CosmosString)state.Value).Value); + int continuationCount = (int)Number64.ToLong(((CosmosNumber64)parsedContinuationToken["continuationCount"]).Value); + return continuationCount; + } + + private static QueryState CreateQueryState(int count) + { + return new QueryState( + CosmosString.Create( + CosmosObject.Create( + new Dictionary() + { + { "continuationCount", CosmosNumber64.Create(++count) }, + }) + .ToString())); } private void Dispose(bool disposing) @@ -176,26 +246,5 @@ public void Dispose() GC.SuppressFinalize(this); } } - - private sealed class MockDocumentContainer : InMemoryContainer - { - private readonly IMonadicQueryDataSource queryDataSource; - - public MockDocumentContainer(PartitionKeyDefinition partitionKeyDefinition, IMonadicQueryDataSource queryDataSource) - : base(partitionKeyDefinition) - { - this.queryDataSource = queryDataSource ?? throw new ArgumentNullException(nameof(queryDataSource)); - } - - public override Task> MonadicQueryAsync( - SqlQuerySpec sqlQuerySpec, - FeedRangeState feedRangeState, - QueryPaginationOptions queryPaginationOptions, - ITrace trace, - CancellationToken cancellationToken) - { - return this.queryDataSource.MonadicQueryAsync(sqlQuerySpec, feedRangeState, queryPaginationOptions, trace, cancellationToken); - } - } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs index 9f63ec45c7..61ce694ef6 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs @@ -185,13 +185,12 @@ public async Task Aggregates() } [TestMethod] - [Ignore("[TODO]: ndeshpan enable after ServiceInterop.dll is refreshed")] public async Task DCount() { List documents = new List(); for (int i = 0; i < 250; i++) { - documents.Add(CosmosObject.Parse($"{{\"pk\" : {i}, \"val\": {i % 50} }}")); + documents.Add(CosmosObject.Parse($"{{\"pk\" : {i}, \"val\": {i % 49} }}")); } List documentsQueried = await ExecuteQueryAsync( @@ -201,7 +200,7 @@ public async Task DCount() Assert.AreEqual(expected: 1, actual: documentsQueried.Count); Assert.IsTrue(documentsQueried[0] is CosmosNumber); CosmosNumber result = documentsQueried[0] as CosmosNumber; - Assert.AreEqual(expected: 50, actual: result); + Assert.AreEqual(expected: 49, actual: Number64.ToLong(result.Value)); } [TestMethod] @@ -253,14 +252,17 @@ public async Task Tracing() internal static async Task> ExecuteQueryAsync( string query, IReadOnlyList documents, - IDocumentContainer documentContainer = null, int pageSize = 10) { - if (documentContainer == null) - { - documentContainer = await CreateDocumentContainerAsync(documents); - } + IDocumentContainer documentContainer = await CreateDocumentContainerAsync(documents); + return await ExecuteQueryAsync(query, documentContainer, pageSize); + } + internal static async Task> ExecuteQueryAsync( + string query, + IDocumentContainer documentContainer, + int pageSize) + { List resultsFromDrainWithoutState = await DrainWithoutStateAsync(query, documentContainer, pageSize); List resultsFromDrainWithState = await DrainWithStateAsync(query, documentContainer, pageSize); @@ -285,7 +287,7 @@ public async Task Fuzz() Assert.AreEqual(expected: 249, actual: documentsQueried.Count); } - private static async Task> DrainWithoutStateAsync(string query, IDocumentContainer documentContainer, int pageSize = 10) + internal static async Task> DrainWithoutStateAsync(string query, IDocumentContainer documentContainer, int pageSize) { IQueryPipelineStage pipelineStage = await CreatePipelineAsync(documentContainer, query, pageSize); @@ -301,7 +303,7 @@ private static async Task> DrainWithoutStateAsync(string que return elements; } - private static async Task> DrainWithStateAsync(string query, IDocumentContainer documentContainer, int pageSize = 10) + private static async Task> DrainWithStateAsync(string query, IDocumentContainer documentContainer, int pageSize) { IQueryPipelineStage pipelineStage; CosmosElement state = null; @@ -393,7 +395,7 @@ private static async Task CreatePipelineAsync( IReadOnlyList feedRanges = await documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default); TryCatch tryCreatePipeline = PipelineFactory.MonadicCreate( - ExecutionEnvironment.Compute, + ExecutionEnvironment.Client, documentContainer, new SqlQuerySpec(query), feedRanges, From 01e2bba3c3ab1491dfb2856f0b072d7a4938acdb Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Thu, 5 May 2022 14:29:59 -0700 Subject: [PATCH 7/8] Add some more unit test coverage --- .../BufferedPartitionRangeEnumeratorTests.cs | 169 +++++++++++------- ...sPartitionPartitionRangeEnumeratorTests.cs | 14 +- .../PartitionRangeEnumeratorTests.cs | 19 +- ...ePartitionPartitionRangeEnumeratorTests.cs | 20 ++- ...yPartitionRangePageAsyncEnumeratorTests.cs | 12 +- .../QueryPartitionRangePageEnumeratorTests.cs | 8 +- 6 files changed, 155 insertions(+), 87 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs index f15d49cc86..e22a22c80c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs @@ -30,17 +30,21 @@ public async Task Test429sWithContinuationsAsync() } [TestMethod] - public async Task TestDrainFullyAsync() + [DataRow(false)] + [DataRow(true)] + public async Task TestDrainFullyAsync(bool aggressivePrefetch) { Implementation implementation = new Implementation(); - await implementation.TestDrainFullyAsync(); + await implementation.TestDrainFullyAsync(aggressivePrefetch); } [TestMethod] - public async Task TestEmptyPages() + [DataRow(false)] + [DataRow(true)] + public async Task TestEmptyPages(bool aggressivePrefetch) { Implementation implementation = new Implementation(); - await implementation.TestEmptyPages(); + await implementation.TestEmptyPages(aggressivePrefetch); } [TestMethod] @@ -51,30 +55,42 @@ public async Task TestResumingFromStateAsync() } [TestMethod] - public async Task TestSplitAsync() + [DataRow(false, false)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(true, true)] + public async Task TestSplitAsync(bool aggressivePrefetch, bool exercisePrefetch) { Implementation implementation = new Implementation(); - await implementation.TestSplitAsync(); + await implementation.TestSplitAsync(aggressivePrefetch, exercisePrefetch); } [TestMethod] - public async Task TestBufferPageAsync() + [DataRow(false, false)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(true, true)] + public async Task TestBufferPageAsync(bool aggressivePrefetch, bool exercisePrefetch) { Implementation implementation = new Implementation(); - await implementation.TestBufferPageAsync(); + await implementation.TestBufferPageAsync(aggressivePrefetch, exercisePrefetch); } [TestMethod] - public async Task TestMoveNextAndBufferPageAsync() + [DataRow(false, false)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(true, true)] + public async Task TestMoveNextAndBufferPageAsync(bool aggressivePrefetch, bool exercisePrefetch) { Implementation implementation = new Implementation(); - await implementation.TestMoveNextAndBufferPageAsync(); + await implementation.TestMoveNextAndBufferPageAsync(aggressivePrefetch, exercisePrefetch); } [TestClass] private sealed class Implementation : PartitionRangeEnumeratorTests { - private static readonly int iterations = 1; + private const int Iterations = 1; public Implementation() : base(singlePartition: true) @@ -82,11 +98,11 @@ public Implementation() } [TestMethod] - public async Task TestSplitAsync() + public async Task TestSplitAsync(bool aggressivePrefetch, bool exercisePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection); + IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection, aggressivePrefetch); (HashSet parentIdentifiers, ReadFeedState state) = await this.PartialDrainAsync(enumerator, numIterations: 3); @@ -95,9 +111,9 @@ public async Task TestSplitAsync() // Try To read from the partition that is gone. await enumerator.MoveNextAsync(); - Assert.IsTrue(enumerator.Current.Failed); + Assert.IsTrue((aggressivePrefetch && exercisePrefetch) || enumerator.Current.Failed); - // Resume on the children using the parent continuaiton token + // Resume on the children using the parent continuation token HashSet childIdentifiers = new HashSet(); foreach (int partitionKeyRangeId in new int[] { 1, 2 }) { @@ -111,6 +127,7 @@ public async Task TestSplitAsync() readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), cancellationToken: default), trace: NoOpTrace.Singleton); + HashSet resourceIdentifiers = await this.DrainFullyAsync(enumerable); childIdentifiers.UnionWith(resourceIdentifiers); @@ -120,19 +137,29 @@ public async Task TestSplitAsync() } [TestMethod] - public async Task TestBufferPageAsync() + public async Task TestBufferPageAsync(bool aggressivePrefetch, bool exercisePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - BufferedPartitionRangePageAsyncEnumerator enumerator = new BufferedPartitionRangePageAsyncEnumerator( - new ReadFeedPartitionRangeEnumerator( - inMemoryCollection, - feedRangeState: new FeedRangeState( - new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), - ReadFeedState.Beginning()), - readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), - cancellationToken: default), - cancellationToken: default); + BufferedPartitionRangePageAsyncEnumeratorBase enumerator = aggressivePrefetch ? + new FullyBufferedPartitionRangeAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), + cancellationToken: default) : + new BufferedPartitionRangePageAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), + cancellationToken: default); int count = 0; @@ -142,11 +169,10 @@ public async Task TestBufferPageAsync() await enumerator.PrefetchAsync(trace: NoOpTrace.Singleton, default); } - Random random = new Random(); while (await enumerator.MoveNextAsync(NoOpTrace.Singleton)) { count += enumerator.Current.Result.GetRecords().Count; - if (random.Next() % 2 == 0) + if (exercisePrefetch) { for (int i = 0; i < 10; i++) { @@ -159,26 +185,34 @@ public async Task TestBufferPageAsync() Assert.AreEqual(numItems, count); } - [TestMethod] - public async Task TestMoveNextAndBufferPageAsync() + public async Task TestMoveNextAndBufferPageAsync(bool aggressivePrefetch, bool exercisePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - Random random = new Random(); - for (int iteration = 0; iteration < iterations; iteration++) + for (int iteration = 0; iteration < Iterations; iteration++) { - BufferedPartitionRangePageAsyncEnumerator enumerator = new BufferedPartitionRangePageAsyncEnumerator( - new ReadFeedPartitionRangeEnumerator( - inMemoryCollection, - feedRangeState: new FeedRangeState( - new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), - ReadFeedState.Beginning()), - readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), - cancellationToken: default), - cancellationToken: default); + BufferedPartitionRangePageAsyncEnumeratorBase enumerator = aggressivePrefetch ? + new FullyBufferedPartitionRangeAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), + cancellationToken: default) : + new BufferedPartitionRangePageAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), + cancellationToken: default); - if ((random.Next() % 2) == 0) + if (exercisePrefetch) { await enumerator.PrefetchAsync(trace: NoOpTrace.Singleton, default); } @@ -187,8 +221,8 @@ public async Task TestMoveNextAndBufferPageAsync() while (await enumerator.MoveNextAsync(NoOpTrace.Singleton)) { count += enumerator.Current.Result.GetRecords().Count; - - if ((random.Next() % 2) == 0) + + if (exercisePrefetch) { await enumerator.PrefetchAsync(trace: NoOpTrace.Singleton, default); } @@ -205,27 +239,49 @@ public override IReadOnlyList GetRecordsFromPage(ReadFeedPage page) protected override IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, + bool aggressivePrefetch = false, ReadFeedState state = null) { return new PartitionRangePageAsyncEnumerable( feedRangeState: new FeedRangeState( new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), state ?? ReadFeedState.Beginning()), - (feedRangeState) => new BufferedPartitionRangePageAsyncEnumerator( - new ReadFeedPartitionRangeEnumerator( - documentContainer, - feedRangeState: feedRangeState, - readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + (feedRangeState) => aggressivePrefetch ? + new FullyBufferedPartitionRangeAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + documentContainer, + feedRangeState: feedRangeState, + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), + cancellationToken: default) : + new BufferedPartitionRangePageAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + documentContainer, + feedRangeState: feedRangeState, + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), cancellationToken: default), - cancellationToken: default), trace: NoOpTrace.Singleton); } - public override IAsyncEnumerator> CreateEnumerator( - IDocumentContainer inMemoryCollection, ReadFeedState state = null, CancellationToken cancellationToken =default) + protected override IAsyncEnumerator> CreateEnumerator( + IDocumentContainer inMemoryCollection, + bool aggressivePrefetch = false, + ReadFeedState state = null, + CancellationToken cancellationToken = default) { return new TracingAsyncEnumerator>( - enumerator: new BufferedPartitionRangePageAsyncEnumerator( + enumerator: aggressivePrefetch ? + new FullyBufferedPartitionRangeAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + state ?? ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: cancellationToken), + cancellationToken: cancellationToken) : + new BufferedPartitionRangePageAsyncEnumerator( new ReadFeedPartitionRangeEnumerator( inMemoryCollection, feedRangeState: new FeedRangeState( @@ -236,15 +292,6 @@ public override IAsyncEnumerator> CreateEnumerator( cancellationToken: cancellationToken), trace: NoOpTrace.Singleton); } - - private async Task BufferMoreInBackground(BufferedPartitionRangePageAsyncEnumerator enumerator) - { - while (true) - { - await enumerator.PrefetchAsync(trace: NoOpTrace.Singleton, default); - await Task.Delay(10); - } - } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs index 3545eb0495..5c725eea35 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs @@ -38,10 +38,12 @@ public async Task Test429sWithContinuationsAsync() } [TestMethod] - public async Task TestEmptyPages() + [DataRow(false)] + [DataRow(true)] + public async Task TestEmptyPages(bool aggressivePrefetch) { Implementation implementation = new Implementation(false); - await implementation.TestEmptyPages(); + await implementation.TestEmptyPages(aggressivePrefetch); } [TestMethod] @@ -326,7 +328,7 @@ public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allow break; } - enumerator = this.CreateEnumerator(inMemoryCollection, tryGetPage.Result.State); + enumerator = this.CreateEnumerator(inMemoryCollection, aggressivePrefetch, tryGetPage.Result.State); } if (random.Next() % 2 == 0) @@ -365,6 +367,7 @@ public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allow protected override IAsyncEnumerable>> CreateEnumerable( IDocumentContainer inMemoryCollection, + bool aggressivePrefetch = false, CrossFeedRangeState state = null) { PartitionRangePageAsyncEnumerator createEnumerator( @@ -379,7 +382,7 @@ PartitionRangePageAsyncEnumerator createEnumerator( createPartitionRangeEnumerator: createEnumerator, comparer: PartitionRangePageAsyncEnumeratorComparer.Singleton, maxConcurrency: 10, - prefetchPolicy: PrefetchPolicy.PrefetchSinglePage, + prefetchPolicy: aggressivePrefetch ? PrefetchPolicy.PrefetchAll : PrefetchPolicy.PrefetchSinglePage, trace: NoOpTrace.Singleton, state: state ?? new CrossFeedRangeState( new FeedRangeState[] @@ -388,8 +391,9 @@ PartitionRangePageAsyncEnumerator createEnumerator( })); } - public override IAsyncEnumerator>> CreateEnumerator( + protected override IAsyncEnumerator>> CreateEnumerator( IDocumentContainer inMemoryCollection, + bool aggressivePrefetch = false, CrossFeedRangeState state = null, CancellationToken cancellationToken = default) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs index b6e8b5bc4e..bfa740de42 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs @@ -33,11 +33,11 @@ public async Task TestMoveNextAsyncThrowsTaskCanceledException() } [TestMethod] - public async Task TestDrainFullyAsync() + public async Task TestDrainFullyAsync(bool aggressivePrefetch) { int numItems = 1000; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection); + IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, aggressivePrefetch); HashSet identifiers = await this.DrainFullyAsync(enumerable); Assert.AreEqual(numItems, identifiers.Count); } @@ -51,7 +51,7 @@ public async Task TestResumingFromStateAsync() IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection); (HashSet firstDrainResults, TState state) = await this.PartialDrainAsync(enumerator, numIterations: 3); - IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, state); + IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, false, state); HashSet secondDrainResults = await this.DrainFullyAsync(enumerable); Assert.AreEqual(numItems, firstDrainResults.Count + secondDrainResults.Count); @@ -130,7 +130,7 @@ public async Task Test429sWithContinuationsAsync() } // Create a new enumerator from that state to simulate when the user want's to start resume later from a continuation token. - enumerator = this.CreateEnumerator(inMemoryCollection, state); + enumerator = this.CreateEnumerator(inMemoryCollection, false, state); } else { @@ -148,7 +148,7 @@ public async Task Test429sWithContinuationsAsync() } [TestMethod] - public async Task TestEmptyPages() + public async Task TestEmptyPages(bool aggressivePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync( @@ -156,7 +156,7 @@ public async Task TestEmptyPages() new FlakyDocumentContainer.FailureConfigs( inject429s: false, injectEmptyPages: true)); - IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection); + IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, aggressivePrefetch); HashSet identifiers = await this.DrainFullyAsync(enumerable); Assert.AreEqual(numItems, identifiers.Count); } @@ -165,9 +165,14 @@ public async Task TestEmptyPages() protected abstract IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, + bool aggressivePrefetch = false, TState state = null); - public abstract IAsyncEnumerator> CreateEnumerator(IDocumentContainer documentContainer, TState state = null, CancellationToken cancellationToken= default); + protected abstract IAsyncEnumerator> CreateEnumerator( + IDocumentContainer inMemoryCollection, + bool aggressivePrefetch = false, + TState state = null, + CancellationToken cancellationToken = default); public async Task CreateDocumentContainerAsync( int numItems, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs index 2c3e1b3212..02b99ddbe3 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs @@ -36,14 +36,14 @@ public async Task Test429sWithContinuationsAsync() public async Task TestDrainFullyAsync() { Implementation implementation = new Implementation(); - await implementation.TestDrainFullyAsync(); + await implementation.TestDrainFullyAsync(false); } [TestMethod] public async Task TestEmptyPages() { Implementation implementation = new Implementation(); - await implementation.TestEmptyPages(); + await implementation.TestEmptyPages(false); } [TestMethod] @@ -124,6 +124,7 @@ public override IReadOnlyList GetRecordsFromPage(ReadFeedPage page) protected override IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, + bool aggressivePrefetch = false, ReadFeedState state = null) { return new PartitionRangePageAsyncEnumerable( @@ -131,15 +132,18 @@ protected override IAsyncEnumerable> CreateEnumerable( new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), state ?? ReadFeedState.Beginning()), (feedRangeState) => new ReadFeedPartitionRangeEnumerator( - documentContainer, - feedRangeState: feedRangeState, - readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), - cancellationToken: default), + documentContainer, + feedRangeState: feedRangeState, + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: default), trace: NoOpTrace.Singleton); } - public override IAsyncEnumerator> CreateEnumerator( - IDocumentContainer inMemoryCollection, ReadFeedState state = null, CancellationToken cancellationToken = default) + protected override IAsyncEnumerator> CreateEnumerator( + IDocumentContainer inMemoryCollection, + bool aggressivePrefetch = false, + ReadFeedState state = null, + CancellationToken cancellationToken = default) { return new TracingAsyncEnumerator>( new ReadFeedPartitionRangeEnumerator( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs index 2055f748d2..847d5cbb1d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs @@ -36,13 +36,19 @@ public override IReadOnlyList GetRecordsFromPage(OrderByQueryPage page) throw new NotImplementedException(); } - protected override IAsyncEnumerable> CreateEnumerable(IDocumentContainer documentContainer, QueryState state = null) + protected override IAsyncEnumerable> CreateEnumerable( + IDocumentContainer documentContainer, + bool aggressivePrefetch = false, + QueryState state = null) { throw new NotImplementedException(); } - public override IAsyncEnumerator> CreateEnumerator( - IDocumentContainer documentContainer, QueryState state = null, CancellationToken cancellationToken = default) + protected override IAsyncEnumerator> CreateEnumerator( + IDocumentContainer documentContainer, + bool aggressivePrefetch = false, + QueryState state = null, + CancellationToken cancellationToken = default) { List ranges = documentContainer.GetFeedRangesAsync( trace: NoOpTrace.Singleton, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs index b161d1cc50..c1ca0b8faa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs @@ -36,14 +36,14 @@ public async Task Test429sWithContinuationsAsync() public async Task TestDrainFullyAsync() { Implementation implementation = new Implementation(); - await implementation.TestDrainFullyAsync(); + await implementation.TestDrainFullyAsync(false); } [TestMethod] public async Task TestEmptyPages() { Implementation implementation = new Implementation(); - await implementation.TestEmptyPages(); + await implementation.TestEmptyPages(false); } [TestMethod] @@ -129,6 +129,7 @@ public override IReadOnlyList GetRecordsFromPage(QueryPage page) protected override IAsyncEnumerable> CreateEnumerable( IDocumentContainer documentContainer, + bool aggressivePrefetch = false, QueryState state = null) { List ranges = documentContainer.GetFeedRangesAsync( @@ -147,8 +148,9 @@ protected override IAsyncEnumerable> CreateEnumerable( trace: NoOpTrace.Singleton); } - public override IAsyncEnumerator> CreateEnumerator( + protected override IAsyncEnumerator> CreateEnumerator( IDocumentContainer documentContainer, + bool aggressivePrefetch = false, QueryState state = default, CancellationToken cancellationToken = default) { From 4e15e4f85aa74900a1275847d978d7922e754bba Mon Sep 17 00:00:00 2001 From: Neil Deshpande Date: Thu, 5 May 2022 16:45:42 -0700 Subject: [PATCH 8/8] Incorporate code review feedback and add more test coverage --- ...lyBufferedPartitionRangeAsyncEnumerator.cs | 17 ++-- .../Query/Core/Pipeline/PipelineFactory.cs | 6 +- .../BufferedPartitionRangeEnumeratorTests.cs | 91 ++++++++++++------- ...sPartitionPartitionRangeEnumeratorTests.cs | 33 ++++--- .../PartitionRangeEnumeratorTests.cs | 30 ++++-- ...ePartitionPartitionRangeEnumeratorTests.cs | 13 ++- ...yPartitionRangePageAsyncEnumeratorTests.cs | 8 +- .../AggressivePrefetchPipelineTests.cs | 1 + .../QueryPartitionRangePageEnumeratorTests.cs | 16 ++-- 9 files changed, 135 insertions(+), 80 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs index 8fe9ec6855..d9da53e15c 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/FullyBufferedPartitionRangeAsyncEnumerator.cs @@ -16,18 +16,16 @@ internal sealed class FullyBufferedPartitionRangeAsyncEnumerator { private readonly PartitionRangePageAsyncEnumerator enumerator; private readonly List bufferedPages; - private int index; + private int currentIndex; private Exception exception; - private bool HasPrefetched => this.exception != null || this.bufferedPages.Count > 0; + private bool HasPrefetched => (this.exception != null) || (this.bufferedPages.Count > 0); public FullyBufferedPartitionRangeAsyncEnumerator(PartitionRangePageAsyncEnumerator enumerator, CancellationToken cancellationToken) : base(enumerator.FeedRangeState, cancellationToken) { this.enumerator = enumerator ?? throw new ArgumentNullException(nameof(enumerator)); this.bufferedPages = new List(); - this.index = 0; - this.exception = null; } public override ValueTask DisposeAsync() @@ -51,7 +49,7 @@ public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken ca using (ITrace prefetchTrace = trace.StartChild("Prefetch", TraceComponent.Pagination, TraceLevel.Info)) { - while (this.exception == null && await this.enumerator.MoveNextAsync(prefetchTrace)) + while (await this.enumerator.MoveNextAsync(prefetchTrace)) { cancellationToken.ThrowIfCancellationRequested(); TryCatch current = this.enumerator.Current; @@ -62,6 +60,7 @@ public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken ca else { this.exception = current.Exception; + break; } } } @@ -70,11 +69,11 @@ public override async ValueTask PrefetchAsync(ITrace trace, CancellationToken ca protected override async Task> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken) { TryCatch result; - if (this.index < this.bufferedPages.Count) + if (this.currentIndex < this.bufferedPages.Count) { - result = TryCatch.FromResult(this.bufferedPages[this.index]); + result = TryCatch.FromResult(this.bufferedPages[this.currentIndex]); } - else if (this.index == this.bufferedPages.Count && this.exception != null) + else if (this.currentIndex == this.bufferedPages.Count && this.exception != null) { result = TryCatch.FromException(this.exception); } @@ -84,7 +83,7 @@ protected override async Task> GetNextPageAsync(ITrace trace, Ca result = this.enumerator.Current; } - ++this.index; + ++this.currentIndex; return result; } diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs index 8a720bdb13..4cbf6400bb 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/PipelineFactory.cs @@ -188,10 +188,8 @@ private static PrefetchPolicy DeterminePrefetchPolicy(QueryInfo queryInfo) { return PrefetchPolicy.PrefetchAll; } - else - { - return PrefetchPolicy.PrefetchSinglePage; - } + + return PrefetchPolicy.PrefetchSinglePage; } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs index e22a22c80c..62114fcdf9 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/BufferedPartitionRangeEnumeratorTests.cs @@ -16,17 +16,23 @@ public sealed class BufferedPartitionPartitionRangeEnumeratorTests { [TestMethod] - public async Task Test429sAsync() + [DataRow(false)] + [DataRow(true)] + public async Task Test429sAsync(bool aggressivePrefetch) { Implementation implementation = new Implementation(); - await implementation.Test429sAsync(); + await implementation.Test429sAsync(aggressivePrefetch); } [TestMethod] - public async Task Test429sWithContinuationsAsync() + [DataRow(false, false)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(true, true)] + public async Task Test429sWithContinuationsAsync(bool aggressivePrefetch, bool exercisePrefetch) { Implementation implementation = new Implementation(); - await implementation.Test429sWithContinuationsAsync(); + await implementation.Test429sWithContinuationsAsync(aggressivePrefetch, exercisePrefetch); } [TestMethod] @@ -48,10 +54,14 @@ public async Task TestEmptyPages(bool aggressivePrefetch) } [TestMethod] - public async Task TestResumingFromStateAsync() + [DataRow(false, false)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(true, true)] + public async Task TestResumingFromStateAsync(bool aggressivePrefetch, bool exercisePrefetch) { Implementation implementation = new Implementation(); - await implementation.TestResumingFromStateAsync(); + await implementation.TestResumingFromStateAsync(aggressivePrefetch, exercisePrefetch); } [TestMethod] @@ -102,7 +112,7 @@ public async Task TestSplitAsync(bool aggressivePrefetch, bool exercisePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection, aggressivePrefetch); + IAsyncEnumerator> enumerator = await this.CreateEnumeratorAsync(inMemoryCollection, aggressivePrefetch, exercisePrefetch); (HashSet parentIdentifiers, ReadFeedState state) = await this.PartialDrainAsync(enumerator, numIterations: 3); @@ -242,11 +252,9 @@ protected override IAsyncEnumerable> CreateEnumerable( bool aggressivePrefetch = false, ReadFeedState state = null) { - return new PartitionRangePageAsyncEnumerable( - feedRangeState: new FeedRangeState( - new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), - state ?? ReadFeedState.Beginning()), - (feedRangeState) => aggressivePrefetch ? + PartitionRangePageAsyncEnumerator CreateBufferedEnumerator(FeedRangeState feedRangeState) + { + BufferedPartitionRangePageAsyncEnumeratorBase enumerator = aggressivePrefetch ? new FullyBufferedPartitionRangeAsyncEnumerator( new ReadFeedPartitionRangeEnumerator( documentContainer, @@ -260,36 +268,53 @@ protected override IAsyncEnumerable> CreateEnumerable( feedRangeState: feedRangeState, readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), cancellationToken: default), - cancellationToken: default), + cancellationToken: default); + + return enumerator; + }; + + return new PartitionRangePageAsyncEnumerable( + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + state ?? ReadFeedState.Beginning()), + createPartitionRangeEnumerator: CreateBufferedEnumerator, trace: NoOpTrace.Singleton); } - protected override IAsyncEnumerator> CreateEnumerator( + protected async override Task>> CreateEnumeratorAsync( IDocumentContainer inMemoryCollection, bool aggressivePrefetch = false, + bool exercisePrefetch = false, ReadFeedState state = null, CancellationToken cancellationToken = default) { + BufferedPartitionRangePageAsyncEnumeratorBase enumerator = aggressivePrefetch ? + new FullyBufferedPartitionRangeAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + state ?? ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: cancellationToken), + cancellationToken: cancellationToken) : + new BufferedPartitionRangePageAsyncEnumerator( + new ReadFeedPartitionRangeEnumerator( + inMemoryCollection, + feedRangeState: new FeedRangeState( + new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), + state ?? ReadFeedState.Beginning()), + readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), + cancellationToken: cancellationToken), + cancellationToken: cancellationToken); + + if (exercisePrefetch) + { + await enumerator.PrefetchAsync(NoOpTrace.Singleton, default); + } + return new TracingAsyncEnumerator>( - enumerator: aggressivePrefetch ? - new FullyBufferedPartitionRangeAsyncEnumerator( - new ReadFeedPartitionRangeEnumerator( - inMemoryCollection, - feedRangeState: new FeedRangeState( - new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), - state ?? ReadFeedState.Beginning()), - readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), - cancellationToken: cancellationToken), - cancellationToken: cancellationToken) : - new BufferedPartitionRangePageAsyncEnumerator( - new ReadFeedPartitionRangeEnumerator( - inMemoryCollection, - feedRangeState: new FeedRangeState( - new FeedRangePartitionKeyRange(partitionKeyRangeId: "0"), - state ?? ReadFeedState.Beginning()), - readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), - cancellationToken: cancellationToken), - cancellationToken: cancellationToken), + enumerator: enumerator, trace: NoOpTrace.Singleton); } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs index 5c725eea35..20338aa768 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs @@ -27,14 +27,14 @@ public sealed class CrossPartitionPartitionRangeEnumeratorTests public async Task Test429sAsync() { Implementation implementation = new Implementation(false); - await implementation.Test429sAsync(); + await implementation.Test429sAsync(false); } [TestMethod] public async Task Test429sWithContinuationsAsync() { Implementation implementation = new Implementation(false); - await implementation.Test429sWithContinuationsAsync(); + await implementation.Test429sWithContinuationsAsync(false, false); } [TestMethod] @@ -220,7 +220,7 @@ protected override Task> GetNextPageAsync(ITrace trace, C public async Task TestSplitAndMergeAsync(bool useState, bool allowSplits, bool allowMerges) { Implementation implementation = new Implementation(singlePartition: false); - await implementation.TestSplitAndMergeImplementationAsync(useState, allowSplits, allowMerges, false); + await implementation.TestSplitAndMergeImplementationAsync(useState, allowSplits, allowMerges); } private sealed class Implementation : PartitionRangeEnumeratorTests, CrossFeedRangeState> @@ -279,7 +279,7 @@ public async Task TestMergeToSinglePartition() cancellationToken: default); await this.DocumentContainer.SplitAsync(ranges.First(), cancellationToken: default); - IAsyncEnumerator>> enumerator = this.CreateEnumerator(this.DocumentContainer); + IAsyncEnumerator>> enumerator = await this.CreateEnumeratorAsync(this.DocumentContainer); List identifiers = new List(); int iteration = 0; while (await enumerator.MoveNextAsync()) @@ -303,11 +303,14 @@ public async Task TestMergeToSinglePartition() Assert.AreEqual(numItems, identifiers.Count); } - public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allowSplits, bool allowMerges, bool aggressivePrefetch) + public async Task TestSplitAndMergeImplementationAsync( + bool useState, + bool allowSplits, + bool allowMerges) { int numItems = 1000; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - IAsyncEnumerator>> enumerator = this.CreateEnumerator(inMemoryCollection); + IAsyncEnumerator>> enumerator = await this.CreateEnumeratorAsync(inMemoryCollection); HashSet identifiers = new HashSet(); Random random = new Random(); while (await enumerator.MoveNextAsync()) @@ -328,7 +331,11 @@ public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allow break; } - enumerator = this.CreateEnumerator(inMemoryCollection, aggressivePrefetch, tryGetPage.Result.State); + enumerator = await this.CreateEnumeratorAsync( + inMemoryCollection, + false, + false, + tryGetPage.Result.State); } if (random.Next() % 2 == 0) @@ -391,20 +398,24 @@ PartitionRangePageAsyncEnumerator createEnumerator( })); } - protected override IAsyncEnumerator>> CreateEnumerator( + protected override Task>>> CreateEnumeratorAsync( IDocumentContainer inMemoryCollection, bool aggressivePrefetch = false, + bool exercisePrefetch = false, CrossFeedRangeState state = null, CancellationToken cancellationToken = default) { PartitionRangePageAsyncEnumerator createEnumerator( - FeedRangeState feedRangeState) => new ReadFeedPartitionRangeEnumerator( + FeedRangeState feedRangeState) + { + return new ReadFeedPartitionRangeEnumerator( inMemoryCollection, feedRangeState: feedRangeState, readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), cancellationToken: default); + } - TracingAsyncEnumerator>> enumerator = new( + IAsyncEnumerator>> enumerator = new TracingAsyncEnumerator>>( new CrossPartitionRangePageAsyncEnumerator( feedRangeProvider: inMemoryCollection, createPartitionRangeEnumerator: createEnumerator, @@ -419,7 +430,7 @@ PartitionRangePageAsyncEnumerator createEnumerator( })), NoOpTrace.Singleton); - return enumerator; + return Task.FromResult(enumerator); } public override IReadOnlyList GetRecordsFromPage(CrossFeedRangePage page) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs index bfa740de42..594b449efa 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/PartitionRangeEnumeratorTests.cs @@ -27,7 +27,13 @@ public async Task TestMoveNextAsyncThrowsTaskCanceledException() int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); CancellationTokenSource cts = new CancellationTokenSource(); - IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection, cancellationToken: cts.Token); + IAsyncEnumerator> enumerator = await this.CreateEnumeratorAsync( + inMemoryCollection, + aggressivePrefetch: false, + exercisePrefetch: false, + state: null, + cancellationToken: cts.Token); + cts.Cancel(); await Assert.ThrowsExceptionAsync(async () => await enumerator.MoveNextAsync()); } @@ -43,22 +49,22 @@ public async Task TestDrainFullyAsync(bool aggressivePrefetch) } [TestMethod] - public async Task TestResumingFromStateAsync() + public async Task TestResumingFromStateAsync(bool aggressivePrefetch, bool exercisePrefetch) { int numItems = 1000; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems); - IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection); + IAsyncEnumerator> enumerator = await this.CreateEnumeratorAsync(inMemoryCollection, aggressivePrefetch, exercisePrefetch); (HashSet firstDrainResults, TState state) = await this.PartialDrainAsync(enumerator, numIterations: 3); - IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, false, state); + IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, aggressivePrefetch, state); HashSet secondDrainResults = await this.DrainFullyAsync(enumerable); Assert.AreEqual(numItems, firstDrainResults.Count + secondDrainResults.Count); } [TestMethod] - public async Task Test429sAsync() + public async Task Test429sAsync(bool aggressivePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync( @@ -67,7 +73,7 @@ public async Task Test429sAsync() inject429s: true, injectEmptyPages: false)); - IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection); + IAsyncEnumerable> enumerable = this.CreateEnumerable(inMemoryCollection, aggressivePrefetch); HashSet identifiers = new HashSet(); await foreach (TryCatch tryGetPage in enumerable) @@ -99,7 +105,7 @@ public async Task Test429sAsync() } [TestMethod] - public async Task Test429sWithContinuationsAsync() + public async Task Test429sWithContinuationsAsync(bool aggressivePrefetch, bool exercisePrefetch) { int numItems = 100; IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync( @@ -108,7 +114,10 @@ public async Task Test429sWithContinuationsAsync() inject429s: true, injectEmptyPages: false)); - IAsyncEnumerator> enumerator = this.CreateEnumerator(inMemoryCollection); + IAsyncEnumerator> enumerator = await this.CreateEnumeratorAsync( + inMemoryCollection, + aggressivePrefetch, + exercisePrefetch); HashSet identifiers = new HashSet(); TState state = default; @@ -130,7 +139,7 @@ public async Task Test429sWithContinuationsAsync() } // Create a new enumerator from that state to simulate when the user want's to start resume later from a continuation token. - enumerator = this.CreateEnumerator(inMemoryCollection, false, state); + enumerator = await this.CreateEnumeratorAsync(inMemoryCollection, false, false, state); } else { @@ -168,9 +177,10 @@ protected abstract IAsyncEnumerable> CreateEnumerable( bool aggressivePrefetch = false, TState state = null); - protected abstract IAsyncEnumerator> CreateEnumerator( + protected abstract Task>> CreateEnumeratorAsync( IDocumentContainer inMemoryCollection, bool aggressivePrefetch = false, + bool exercisePrefetch = false, TState state = null, CancellationToken cancellationToken = default); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs index 02b99ddbe3..cf63ef8f10 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/SinglePartitionPartitionRangeEnumeratorTests.cs @@ -22,14 +22,14 @@ public sealed class SinglePartitionPartitionRangeEnumeratorTests public async Task Test429sAsync() { Implementation implementation = new Implementation(); - await implementation.Test429sAsync(); + await implementation.Test429sAsync(false); } [TestMethod] public async Task Test429sWithContinuationsAsync() { Implementation implementation = new Implementation(); - await implementation.Test429sWithContinuationsAsync(); + await implementation.Test429sWithContinuationsAsync(false, false); } [TestMethod] @@ -50,7 +50,7 @@ public async Task TestEmptyPages() public async Task TestResumingFromStateAsync() { Implementation implementation = new Implementation(); - await implementation.TestResumingFromStateAsync(); + await implementation.TestResumingFromStateAsync(false, false); } [TestMethod] @@ -139,13 +139,14 @@ protected override IAsyncEnumerable> CreateEnumerable( trace: NoOpTrace.Singleton); } - protected override IAsyncEnumerator> CreateEnumerator( + protected override Task>> CreateEnumeratorAsync( IDocumentContainer inMemoryCollection, bool aggressivePrefetch = false, + bool exercisePrefetch = false, ReadFeedState state = null, CancellationToken cancellationToken = default) { - return new TracingAsyncEnumerator>( + IAsyncEnumerator> enumerator = new TracingAsyncEnumerator>( new ReadFeedPartitionRangeEnumerator( inMemoryCollection, feedRangeState: new FeedRangeState( @@ -154,6 +155,8 @@ protected override IAsyncEnumerator> CreateEnumerator( readFeedPaginationOptions: new ReadFeedPaginationOptions(pageSizeHint: 10), cancellationToken: cancellationToken), trace: NoOpTrace.Singleton); + + return Task.FromResult(enumerator); } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs index 847d5cbb1d..b08f01e317 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/OrderByQueryPartitionRangePageAsyncEnumeratorTests.cs @@ -44,9 +44,10 @@ protected override IAsyncEnumerable> CreateEnumerable throw new NotImplementedException(); } - protected override IAsyncEnumerator> CreateEnumerator( + protected override Task>> CreateEnumeratorAsync( IDocumentContainer documentContainer, bool aggressivePrefetch = false, + bool exercisePrefetch = false, QueryState state = null, CancellationToken cancellationToken = default) { @@ -54,7 +55,8 @@ protected override IAsyncEnumerator> CreateEnumerator trace: NoOpTrace.Singleton, cancellationToken: cancellationToken).Result; Assert.AreEqual(1, ranges.Count); - return new TracingAsyncEnumerator>( + + IAsyncEnumerator> enumerator = new TracingAsyncEnumerator>( new OrderByQueryPartitionRangePageAsyncEnumerator( queryDataSource: documentContainer, sqlQuerySpec: new Cosmos.Query.Core.SqlQuerySpec("SELECT * FROM c"), @@ -64,6 +66,8 @@ protected override IAsyncEnumerator> CreateEnumerator filter: "filter", cancellationToken: cancellationToken), NoOpTrace.Singleton); + + return Task.FromResult(enumerator); } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs index a62a4420c6..fc9d10bf10 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/AggressivePrefetchPipelineTests.cs @@ -168,6 +168,7 @@ public MockDocumentContainer( this.semaphore = new SemaphoreSlim(0, maxConcurrency); this.cancellationToken = cancellationToken; } + public override async Task> MonadicQueryAsync( SqlQuerySpec sqlQuerySpec, FeedRangeState feedRangeState, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs index c1ca0b8faa..4672fb17f5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/QueryPartitionRangePageEnumeratorTests.cs @@ -22,14 +22,14 @@ public class QueryPartitionRangePageAsyncEnumeratorTests public async Task Test429sAsync() { Implementation implementation = new Implementation(); - await implementation.Test429sAsync(); + await implementation.Test429sAsync(false); } [TestMethod] public async Task Test429sWithContinuationsAsync() { Implementation implementation = new Implementation(); - await implementation.Test429sWithContinuationsAsync(); + await implementation.Test429sWithContinuationsAsync(false, false); } [TestMethod] @@ -50,7 +50,7 @@ public async Task TestEmptyPages() public async Task TestResumingFromStateAsync() { Implementation implementation = new Implementation(); - await implementation.TestResumingFromStateAsync(); + await implementation.TestResumingFromStateAsync(false, false); } [TestMethod] @@ -73,7 +73,7 @@ public async Task TestSplitAsync() { int numItems = 100; IDocumentContainer documentContainer = await this.CreateDocumentContainerAsync(numItems); - IAsyncEnumerator> enumerator = this.CreateEnumerator(documentContainer); + IAsyncEnumerator> enumerator = await this.CreateEnumeratorAsync(documentContainer); (HashSet parentIdentifiers, QueryState state) = await this.PartialDrainAsync(enumerator, numIterations: 3); @@ -148,9 +148,10 @@ protected override IAsyncEnumerable> CreateEnumerable( trace: NoOpTrace.Singleton); } - protected override IAsyncEnumerator> CreateEnumerator( + protected override Task>> CreateEnumeratorAsync( IDocumentContainer documentContainer, bool aggressivePrefetch = false, + bool exercisePrefetch = false, QueryState state = default, CancellationToken cancellationToken = default) { @@ -158,7 +159,8 @@ protected override IAsyncEnumerator> CreateEnumerator( trace: NoOpTrace.Singleton, cancellationToken: default).Result; Assert.AreEqual(1, ranges.Count); - return new TracingAsyncEnumerator>( + + IAsyncEnumerator> enumerator = new TracingAsyncEnumerator>( enumerator: new QueryPartitionRangePageAsyncEnumerator( queryDataSource: documentContainer, sqlQuerySpec: new Cosmos.Query.Core.SqlQuerySpec("SELECT * FROM c"), @@ -167,6 +169,8 @@ protected override IAsyncEnumerator> CreateEnumerator( queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), cancellationToken: cancellationToken), trace: NoOpTrace.Singleton); + + return Task.FromResult(enumerator); } } }