From 5b1926ab8bb0e3b32865d95f70f170c6a443341a Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 06:47:40 -0700 Subject: [PATCH 01/11] Correctness of test --- .../Contracts/ContractTests.cs | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs index 170dba8bb2..6ec15cb227 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs @@ -17,6 +17,8 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests.Contracts using System.IO; using Newtonsoft.Json.Linq; using Microsoft.Azure.Cosmos.ChangeFeed; + using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.Monads; [EmulatorTests.TestClass] public class ContractTests : BaseCosmosClientHelper @@ -142,9 +144,9 @@ public async Task ChangeFeed_FeedRange_FromV0Token() ContainerInternal container = (ContainerInlineCore)largerContainer; - int expected = 100; + int expected = 3; int count = 0; - await this.CreateRandomItems((ContainerCore)container, expected, randomPartitionKey: true); + await this.CreateRandomItems((ContainerCore)container, expected, randomPartitionKey: false); IReadOnlyList feedRanges = await container.GetFeedRangesAsync(); List continuations = new List(); @@ -161,23 +163,35 @@ public async Task ChangeFeed_FeedRange_FromV0Token() changeFeedMode: ChangeFeedMode.Incremental, changeFeedRequestOptions: requestOptions) as ChangeFeedIteratorCore; ResponseMessage firstResponse = await feedIterator.ReadNextAsync(); - FeedRangeEpk FeedRangeEpk = feedRange as FeedRangeEpk; // Construct the continuation's range, using PKRangeId + ETag List ct = new List() { new { - min = FeedRangeEpk.Range.Min, - max = FeedRangeEpk.Range.Max, - token = (string)null + FeedRange = new + { + type = "Physical Partition Key Range Id", + value = pkRangeIds.First() + }, + State = new + { + type = "continuation", + value = JObject.Parse(firstResponse.ContinuationToken)["Continuation"][0]["State"]["value"].ToString() + } } }; + if (firstResponse.Content != null) + { + Collection response = TestCommon.SerializerCore.FromStream>(firstResponse.Content).Data; + count += response.Count; + } + // Extract Etag and manually construct the continuation dynamic oldContinuation = new { - V = 0, + V = 2, Rid = await container.GetCachedRIDAsync(cancellationToken: this.cancellationToken), Continuation = ct }; @@ -190,7 +204,6 @@ public async Task ChangeFeed_FeedRange_FromV0Token() ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions() { PageSizeHint = 100, - EmitOldContinuationToken = true, }; ChangeFeedIteratorCore feedIterator = container.GetChangeFeedStreamIterator( changeFeedStartFrom: ChangeFeedStartFrom.ContinuationToken(continuation), @@ -202,8 +215,13 @@ public async Task ChangeFeed_FeedRange_FromV0Token() Collection response = TestCommon.SerializerCore.FromStream>(firstResponse.Content).Data; count += response.Count; string migratedContinuation = firstResponse.ContinuationToken; - Assert.IsTrue(FeedRangeContinuation.TryParse(migratedContinuation, out FeedRangeContinuation feedRangeContinuation)); - Assert.IsTrue(feedRangeContinuation.FeedRange is FeedRangeEpk); + TryCatch monadicParsedToken = CosmosElement.Monadic.Parse(migratedContinuation); + Assert.IsFalse(monadicParsedToken.Failed); + TryCatch monadicVersionedToken = VersionedAndRidCheckedCompositeToken + .MonadicCreateFromCosmosElement(monadicParsedToken.Result); + Assert.IsFalse(monadicVersionedToken.Failed); + VersionedAndRidCheckedCompositeToken versionedAndRidCheckedCompositeToken = monadicVersionedToken.Result; + Assert.AreEqual(VersionedAndRidCheckedCompositeToken.Version.V2, versionedAndRidCheckedCompositeToken.VersionNumber); } } From 553d7b585cc6bbb984411786bc6bd350c6e1d513 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 10:54:14 -0700 Subject: [PATCH 02/11] Adding NextRange --- .../CrossPartitionRangePageAsyncEnumerator.cs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs index 3a137f4dcf..6f19ab95a9 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs @@ -97,6 +97,8 @@ public CrossPartitionRangePageAsyncEnumerator( public FeedRangeInternal CurrentRange { get; private set; } + public FeedRangeInternal NextRange { get; private set; } + public ValueTask MoveNextAsync() { return this.MoveNextAsync(NoOpTrace.Singleton); @@ -120,6 +122,7 @@ public async ValueTask MoveNextAsync(ITrace trace) { this.Current = default; this.CurrentRange = default; + this.NextRange = default; return false; } @@ -185,6 +188,7 @@ public async ValueTask MoveNextAsync(ITrace trace) this.Current = TryCatch>.FromException(currentPaginator.Current.Exception); this.CurrentRange = currentPaginator.FeedRangeState.FeedRange; + this.NextRange = CrossPartitionRangePageAsyncEnumerator.GetNextRange(enumerators); return true; } @@ -214,6 +218,7 @@ public async ValueTask MoveNextAsync(ITrace trace) this.Current = TryCatch>.FromResult( new CrossFeedRangePage(currentPaginator.Current.Result, crossPartitionState)); this.CurrentRange = currentPaginator.FeedRangeState.FeedRange; + this.NextRange = CrossPartitionRangePageAsyncEnumerator.GetNextRange(enumerators); return true; } } @@ -236,6 +241,17 @@ private static bool IsSplitException(Exception exeception) && (cosmosException.SubStatusCode == (int)Documents.SubStatusCodes.PartitionKeyRangeGone); } + private static FeedRangeInternal GetNextRange(IQueue> enumerators) + { + if (enumerators == null + || enumerators.Count == 0) + { + return default; + } + + return enumerators.Peek()?.FeedRangeState.FeedRange; + } + private interface IQueue : IEnumerable { T Peek(); From d25f28171e592e9bf80b9bfa3be699ae040c0a5e Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 10:57:35 -0700 Subject: [PATCH 03/11] Consuming NextRange --- ...CrossPartitionChangeFeedAsyncEnumerator.cs | 79 ++++++++++--------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs index 98e4159dd0..2205bbc9c5 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs @@ -69,54 +69,59 @@ public async ValueTask MoveNextAsync(ITrace trace) ChangeFeedPage backendPage = crossFeedRangePage.Page; if (backendPage is ChangeFeedNotModifiedPage) { - using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info)) + // Keep draining the cross partition enumerator until + // We get a non 304 page or we loop back to the same range or run into an exception + FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange; + FeedRangeInternal nextRange = this.crossPartitionEnumerator.NextRange; + // No point on draining when the state has 1 range + if (!originalRange.Equals(nextRange)) { - // Keep draining the cross partition enumerator until - // We get a non 304 page or we loop back to the same range or run into an exception - FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange; - double totalRequestCharge = backendPage.RequestCharge; - do + using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info)) { - if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages)) + double totalRequestCharge = backendPage.RequestCharge; + do { - throw new InvalidOperationException("ChangeFeed should always have a next page."); + if (!await this.crossPartitionEnumerator.MoveNextAsync(drainNotModifedPages)) + { + throw new InvalidOperationException("ChangeFeed should always have a next page."); + } + + monadicCrossPartitionPage = this.crossPartitionEnumerator.Current; + if (monadicCrossPartitionPage.Failed) + { + // Buffer the exception, since we need to return the request charge so far. + this.bufferedException = TryCatch>.FromException(monadicCrossPartitionPage.Exception); + } + else + { + crossFeedRangePage = monadicCrossPartitionPage.Result; + backendPage = crossFeedRangePage.Page; + totalRequestCharge += backendPage.RequestCharge; + } } + while (!(backendPage is ChangeFeedSuccessPage + || this.crossPartitionEnumerator.NextRange.Equals(originalRange) + || this.bufferedException.HasValue)); - monadicCrossPartitionPage = this.crossPartitionEnumerator.Current; - if (monadicCrossPartitionPage.Failed) + // Create a page with the aggregated request charge + if (backendPage is ChangeFeedSuccessPage changeFeedSuccessPage) { - // Buffer the exception, since we need to return the request charge so far. - this.bufferedException = TryCatch>.FromException(monadicCrossPartitionPage.Exception); + backendPage = new ChangeFeedSuccessPage( + changeFeedSuccessPage.Content, + totalRequestCharge, + changeFeedSuccessPage.ActivityId, + changeFeedSuccessPage.AdditionalHeaders, + changeFeedSuccessPage.State); } else { - crossFeedRangePage = monadicCrossPartitionPage.Result; - backendPage = crossFeedRangePage.Page; - totalRequestCharge += backendPage.RequestCharge; + backendPage = new ChangeFeedNotModifiedPage( + totalRequestCharge, + backendPage.ActivityId, + backendPage.AdditionalHeaders, + backendPage.State); } } - while (!(backendPage is ChangeFeedSuccessPage - || this.crossPartitionEnumerator.CurrentRange.Equals(originalRange) - || this.bufferedException.HasValue)); - - // Create a page with the aggregated request charge - if (backendPage is ChangeFeedSuccessPage changeFeedSuccessPage) - { - backendPage = new ChangeFeedSuccessPage( - changeFeedSuccessPage.Content, - totalRequestCharge, - changeFeedSuccessPage.ActivityId, - changeFeedSuccessPage.AdditionalHeaders, - changeFeedSuccessPage.State); - } - else - { - backendPage = new ChangeFeedNotModifiedPage( - totalRequestCharge, - backendPage.ActivityId, - backendPage.AdditionalHeaders, - backendPage.State); - } } } From 5f38db091ce5204decf80f5e7c11d65c69a299d2 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 10:57:45 -0700 Subject: [PATCH 04/11] Adding tests --- ...PartitionChangeFeedAsyncEnumeratorTests.cs | 258 +++++++++++++++++- 1 file changed, 248 insertions(+), 10 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs index c0dc4fe422..837d4a9c12 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs @@ -17,6 +17,9 @@ namespace Microsoft.Azure.Cosmos.Tests.ChangeFeed using System.IO; using Microsoft.Azure.Cosmos.Tracing; using System.Collections.Immutable; + using Moq; + using System.Threading; + using System.Text; [TestClass] public sealed class CrossPartitionChangeFeedAsyncEnumeratorTests @@ -83,7 +86,7 @@ public async Task StartFromBeginningAsync(bool useContinuations) ChangeFeedPaginationOptions.Default, cancellationToken: default); - int globalCount = await (useContinuations + (int globalCount, double _) = await (useContinuations ? DrainWithUntilNotModifiedWithContinuationTokens(documentContainer, enumerator) : DrainUntilNotModifedAsync(enumerator)); Assert.AreEqual(numItems, globalCount); @@ -120,7 +123,7 @@ public async Task StartFromTimeAsync(bool useContinuations) } } - int globalCount = await (useContinuations + (int globalCount, double _) = await (useContinuations ? DrainWithUntilNotModifiedWithContinuationTokens(documentContainer, enumerator) : DrainUntilNotModifedAsync(enumerator)); @@ -144,9 +147,11 @@ public async Task StartFromNowAsync(bool useContinuations) ChangeFeedPaginationOptions.Default, cancellationToken: default); - Assert.AreEqual(0, await (useContinuations + (int globalCount, double _) = await (useContinuations ? DrainWithUntilNotModifiedWithContinuationTokens(documentContainer, enumerator) - : DrainUntilNotModifedAsync(enumerator))); + : DrainUntilNotModifedAsync(enumerator)); + + Assert.AreEqual(0, globalCount); for (int i = 0; i < numItems; i++) { @@ -162,18 +167,231 @@ public async Task StartFromNowAsync(bool useContinuations) } } - int globalCount = await (useContinuations + (int globalCountAfter, double _) = await (useContinuations ? DrainWithUntilNotModifiedWithContinuationTokens(documentContainer, enumerator) : DrainUntilNotModifedAsync(enumerator)); - Assert.AreEqual(numItems, globalCount); + Assert.AreEqual(numItems, globalCountAfter); + } + + // Verifies that it cycles on all the internal ranges before returning a 304 + // The number of requests should be equal to the number of internal ranges + [DataTestMethod] + [Timeout(5000)] + [DataRow(1, DisplayName = "FeedRange spanning 1 partition")] + [DataRow(2, DisplayName = "FeedRange spanning 2 partitions")] + [DataRow(3, DisplayName = "FeedRange spanning 3 partitions")] + public async Task ShouldReturnNotModifiedAfterCyclingOnAllRanges(int partitions) + { + ReadOnlyMemory> rangeStates = null; + + if (partitions == 1) + { + rangeStates = new FeedRangeState[]{ + new FeedRangeState(FeedRangeEpk.FullRange, ChangeFeedState.Now()) + }; + } + if (partitions == 2) + { + rangeStates = new FeedRangeState[]{ + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("AA", "FF", true, false)), ChangeFeedState.Now()), + }; + } + if (partitions == 3) + { + rangeStates = new FeedRangeState[]{ + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("AA", "BB", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("BB", "FF", true, false)), ChangeFeedState.Now()), + }; + } + + Assert.IsNotNull(rangeStates, $"Range states not initialized for {partitions} partitions"); + + CrossFeedRangeState state = new CrossFeedRangeState(rangeStates); + Mock documentContainer = new Mock(); + + // Returns a 304 with 1RU charge + documentContainer.Setup(c => c.MonadicChangeFeedAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny())).ReturnsAsync( + (FeedRangeState state, ChangeFeedPaginationOptions options, ITrace trace, CancellationToken token) + => TryCatch.FromResult(new ChangeFeedNotModifiedPage(requestCharge: 1, activityId: string.Empty, additionalHeaders: default, state.State))); + CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create( + documentContainer.Object, + state, + ChangeFeedPaginationOptions.Default, + cancellationToken: default); + + (int _, double requestCharge) = await DrainUntilNotModifedAsync(enumerator); + + // Verify the number of calls were the expected + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Exactly(partitions)); + + // Verify the RU is being summarized + Assert.AreEqual(partitions, requestCharge, "Should sum requestcharge of all notmodified pages"); + + // Verify the calls match the ranges + if (partitions == 1) + { + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(FeedRangeEpk.FullRange)), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + } + if (partitions == 2) + { + rangeStates = new FeedRangeState[]{ + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("AA", "FF", true, false)), ChangeFeedState.Now()), + }; + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("AA", "FF", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + } + if (partitions == 3) + { + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("AA", "BB", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("BB", "FF", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + } + } + + // Verifies that in a FeedRange with any partitions, the not modified are skipped until we find one that contains results + [TestMethod] + [Timeout(5000)] + public async Task ShouldSkipNotModifiedAndReturnResults() + { + ReadOnlyMemory> rangeStates = new FeedRangeState[]{ + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("AA", "BB", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("BB", "CC", true, false)), ChangeFeedState.Now()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("CC", "FF", true, false)), ChangeFeedState.Now()), + }; + + CrossFeedRangeState state = new CrossFeedRangeState(rangeStates); + Mock documentContainer = new Mock(); + + const double expectedTotalRU = 5 + 1 + 1; // 2x 304s + OK + + // Returns a 304 with 1RU charge on <>-AA + documentContainer.Setup(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny())).ReturnsAsync( + (FeedRangeState state, ChangeFeedPaginationOptions options, ITrace trace, CancellationToken token) + => TryCatch.FromResult(new ChangeFeedNotModifiedPage(requestCharge: 1, activityId: string.Empty, additionalHeaders: default, state.State))); + + // Returns a 304 with 1RU charge on AA-BB + documentContainer.Setup(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("AA", "BB", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny())).ReturnsAsync( + (FeedRangeState state, ChangeFeedPaginationOptions options, ITrace trace, CancellationToken token) + => TryCatch.FromResult(new ChangeFeedNotModifiedPage(requestCharge: 1, activityId: string.Empty, additionalHeaders: default, state.State))); + + // Returns a 200 with 5RU charge on BB-CC + documentContainer.Setup(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("BB", "CC", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny())).ReturnsAsync( + (FeedRangeState state, ChangeFeedPaginationOptions options, ITrace trace, CancellationToken token) + => TryCatch.FromResult(new ChangeFeedSuccessPage(content: new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}")), requestCharge: 5, activityId: string.Empty, additionalHeaders: default, state.State))); + + // Returns a 304 with 1RU charge on CC-FF + documentContainer.Setup(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("CC", "FF", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny())).ReturnsAsync( + (FeedRangeState state, ChangeFeedPaginationOptions options, ITrace trace, CancellationToken token) + => TryCatch.FromResult(new ChangeFeedNotModifiedPage(requestCharge: 1, activityId: string.Empty, additionalHeaders: default, state.State))); + + CrossPartitionChangeFeedAsyncEnumerator enumerator = CrossPartitionChangeFeedAsyncEnumerator.Create( + documentContainer.Object, + state, + ChangeFeedPaginationOptions.Default, + cancellationToken: default); + + (int _, double requestCharge) = await DrainUntilSuccessAsync(enumerator); + + // Verify the number of calls were the expected + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Exactly(3)); + + // Verify the RU is being summarized + Assert.AreEqual(expectedTotalRU, requestCharge, "Should sum requestcharge of all notmodified pages"); + + // Verify the calls match the ranges + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("AA", "BB", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("BB", "CC", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Once); + + documentContainer.Verify(c => c.MonadicChangeFeedAsync( + It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("CC", "FF", true, false)))), + It.IsAny(), + It.IsAny(), + It.IsAny()), Times.Never); } - private static async Task DrainUntilNotModifedAsync(CrossPartitionChangeFeedAsyncEnumerator enumerator) + private static async Task<(int, double)> DrainUntilNotModifedAsync(CrossPartitionChangeFeedAsyncEnumerator enumerator) { int globalCount = 0; + double requestCharge = 0; while (await enumerator.MoveNextAsync()) { Assert.IsTrue(enumerator.Current.Succeeded); + requestCharge += enumerator.Current.Result.Page.RequestCharge; if (!(enumerator.Current.Result.Page is ChangeFeedSuccessPage changeFeedSuccessPage)) { break; @@ -182,13 +400,32 @@ private static async Task DrainUntilNotModifedAsync(CrossPartitionChangeFee globalCount += GetResponseCount(changeFeedSuccessPage.Content); } - return globalCount; + return (globalCount, requestCharge); + } + + private static async Task<(int, double)> DrainUntilSuccessAsync(CrossPartitionChangeFeedAsyncEnumerator enumerator) + { + int globalCount = 0; + double requestCharge = 0; + while (await enumerator.MoveNextAsync()) + { + Assert.IsTrue(enumerator.Current.Succeeded); + requestCharge += enumerator.Current.Result.Page.RequestCharge; + if (enumerator.Current.Result.Page is ChangeFeedSuccessPage changeFeedSuccessPage) + { + globalCount += GetResponseCount(changeFeedSuccessPage.Content); + break; + } + } + + return (globalCount, requestCharge); } - private static async Task DrainWithUntilNotModifiedWithContinuationTokens( + private static async Task<(int, double)> DrainWithUntilNotModifiedWithContinuationTokens( IDocumentContainer documentContainer, CrossPartitionChangeFeedAsyncEnumerator enumerator) { + double requestCharge = 0; List globalChanges = new List(); while (true) { @@ -204,6 +441,7 @@ private static async Task DrainWithUntilNotModifiedWithContinuationTokens( break; } + requestCharge += changeFeedSuccessPage.RequestCharge; CosmosArray changes = GetChanges(changeFeedSuccessPage.Content); globalChanges.AddRange(changes); @@ -214,7 +452,7 @@ private static async Task DrainWithUntilNotModifiedWithContinuationTokens( cancellationToken: default); } - return globalChanges.Count; + return (globalChanges.Count, requestCharge); } private static int GetResponseCount(Stream stream) From fdef8230d83647f92059dc1edd16ff97559bb06a Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 11:26:00 -0700 Subject: [PATCH 05/11] Revert local test changes for debugging --- .../Contracts/ContractTests.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs index 6ec15cb227..e11fac3221 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs @@ -134,7 +134,7 @@ public async Task ItemStreamContractVerifier() } [TestMethod] - [Timeout(30000)] + //[Timeout(30000)] public async Task ChangeFeed_FeedRange_FromV0Token() { ContainerResponse largerContainer = await this.database.CreateContainerAsync( @@ -144,9 +144,9 @@ public async Task ChangeFeed_FeedRange_FromV0Token() ContainerInternal container = (ContainerInlineCore)largerContainer; - int expected = 3; + int expected = 100; int count = 0; - await this.CreateRandomItems((ContainerCore)container, expected, randomPartitionKey: false); + await this.CreateRandomItems((ContainerCore)container, expected, randomPartitionKey: true); IReadOnlyList feedRanges = await container.GetFeedRangesAsync(); List continuations = new List(); From 1bc60c88119e70b1fee04f714d50a93854362d6f Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 11:28:50 -0700 Subject: [PATCH 06/11] Adding timeout --- .../Contracts/ContractTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs index e11fac3221..96a71d418b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Contracts/ContractTests.cs @@ -134,7 +134,7 @@ public async Task ItemStreamContractVerifier() } [TestMethod] - //[Timeout(30000)] + [Timeout(30000)] public async Task ChangeFeed_FeedRange_FromV0Token() { ContainerResponse largerContainer = await this.database.CreateContainerAsync( From 99b7b59a78f1aefb27c800c1b9a21fa738f416bd Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 11:55:13 -0700 Subject: [PATCH 07/11] cleanup extra code --- .../CrossPartitionChangeFeedAsyncEnumeratorTests.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs index 837d4a9c12..80e6370189 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs @@ -248,11 +248,6 @@ public async Task ShouldReturnNotModifiedAfterCyclingOnAllRanges(int partitions) } if (partitions == 2) { - rangeStates = new FeedRangeState[]{ - new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)), ChangeFeedState.Now()), - new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("AA", "FF", true, false)), ChangeFeedState.Now()), - }; - documentContainer.Verify(c => c.MonadicChangeFeedAsync( It.Is>(s => s.FeedRange.Equals(new FeedRangeEpk(new Documents.Routing.Range("", "AA", true, false)))), It.IsAny(), From 701f76d603f5a2385b5c01a3fa66b6133eee4b10 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 20 Apr 2021 12:57:33 -0700 Subject: [PATCH 08/11] baselines --- ...raceWriterBaselineTests.ScenariosAsync.xml | 44 +------------------ 1 file changed, 2 insertions(+), 42 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml index 6e6c933845..718b85458f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml @@ -1434,12 +1434,9 @@ ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── [7F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,9F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [9F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,BF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - └── [3F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,5F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + └── [9F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,BF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ]]> Date: Tue, 20 Apr 2021 13:33:40 -0700 Subject: [PATCH 09/11] more baselines --- ...aceWriterBaselineTests.ChangeFeedAsync.xml | 130 +----------------- 1 file changed, 1 insertion(+), 129 deletions(-) diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.ChangeFeedAsync.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.ChangeFeedAsync.xml index 8a5adbeee7..bf175ce321 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.ChangeFeedAsync.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/BaselineTest/TestBaseline/EndToEndTraceWriterBaselineTests.ChangeFeedAsync.xml @@ -185,23 +185,8 @@ │ │ [Client Side Request Stats] │ │ Redacted To Not Change The Baselines From Run To Run │ │ ) - │ ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── [05C1E7FFFFFFFA,FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ ├── Get Partition Key Range Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ ├── Get Collection Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ ├── Try Get Overlapping Ranges(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── Microsoft.Azure.Cosmos.Handlers.DiagnosticsHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── Microsoft.Azure.Cosmos.Handlers.RetryHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── Microsoft.Azure.Cosmos.Handlers.RouterHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── Microsoft.Azure.Cosmos.Handlers.TransportHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ └── Microsoft.Azure.Documents.ServerStoreModel Transport Request(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ │ ( - │ │ [Client Side Request Stats] - │ │ Redacted To Not Change The Baselines From Run To Run - │ │ ) │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [,05C1CFFFFFFFF8) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [05C1E7FFFFFFFA,FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler(00000000-0000-0000-0000-000000000000) RequestHandler-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ ├── Get Partition Key Range Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ ├── Get Collection Cache(00000000-0000-0000-0000-000000000000) Routing-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds @@ -1548,119 +1533,6 @@ ] } ] - }, - { - "name": "MoveNextAsync", - "id": "00000000-0000-0000-0000-000000000000", - "caller info": { - "member": "MemberName", - "file": "FilePath", - "line": 42 - }, - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "[,05C1CFFFFFFFF8) move next", - "id": "00000000-0000-0000-0000-000000000000", - "caller info": { - "member": "MemberName", - "file": "FilePath", - "line": 42 - }, - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler", - "id": "00000000-0000-0000-0000-000000000000", - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Get Partition Key Range Cache", - "id": "00000000-0000-0000-0000-000000000000", - "caller info": { - "member": "MemberName", - "file": "FilePath", - "line": 42 - }, - "start time": "12:00:00:000", - "duration in milliseconds": 0 - }, - { - "name": "Get Collection Cache", - "id": "00000000-0000-0000-0000-000000000000", - "caller info": { - "member": "MemberName", - "file": "FilePath", - "line": 42 - }, - "start time": "12:00:00:000", - "duration in milliseconds": 0 - }, - { - "name": "Try Get Overlapping Ranges", - "id": "00000000-0000-0000-0000-000000000000", - "caller info": { - "member": "MemberName", - "file": "FilePath", - "line": 42 - }, - "start time": "12:00:00:000", - "duration in milliseconds": 0 - }, - { - "name": "Microsoft.Azure.Cosmos.Handlers.DiagnosticsHandler", - "id": "00000000-0000-0000-0000-000000000000", - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Microsoft.Azure.Cosmos.Handlers.RetryHandler", - "id": "00000000-0000-0000-0000-000000000000", - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Microsoft.Azure.Cosmos.Handlers.RouterHandler", - "id": "00000000-0000-0000-0000-000000000000", - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Microsoft.Azure.Cosmos.Handlers.TransportHandler", - "id": "00000000-0000-0000-0000-000000000000", - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "children": [ - { - "name": "Microsoft.Azure.Documents.ServerStoreModel Transport Request", - "id": "00000000-0000-0000-0000-000000000000", - "caller info": { - "member": "MemberName", - "file": "FilePath", - "line": 42 - }, - "start time": "12:00:00:000", - "duration in milliseconds": 0, - "data": { - "Client Side Request Stats": "Redacted To Not Change The Baselines From Run To Run" - } - } - ] - } - ] - } - ] - } - ] - } - ] - } - ] - } - ] } ] } From acf684ed44d1391526df181eac763a692bfe4c7d Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Fri, 23 Apr 2021 07:58:59 -0700 Subject: [PATCH 10/11] Refactoring --- ...CrossPartitionChangeFeedAsyncEnumerator.cs | 13 +++++++--- .../CrossPartitionRangePageAsyncEnumerator.cs | 25 +++++++++++++------ ...PartitionChangeFeedAsyncEnumeratorTests.cs | 1 - 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs index 2205bbc9c5..91762d1ff6 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs @@ -72,9 +72,8 @@ public async ValueTask MoveNextAsync(ITrace trace) // Keep draining the cross partition enumerator until // We get a non 304 page or we loop back to the same range or run into an exception FeedRangeInternal originalRange = this.crossPartitionEnumerator.CurrentRange; - FeedRangeInternal nextRange = this.crossPartitionEnumerator.NextRange; // No point on draining when the state has 1 range - if (!originalRange.Equals(nextRange)) + if (!IsNextRangeEqualToOriginal(this.crossPartitionEnumerator, originalRange)) { using (ITrace drainNotModifedPages = changeFeedMoveNextTrace.StartChild("Drain NotModified Pages", TraceComponent.ChangeFeed, TraceLevel.Info)) { @@ -100,7 +99,7 @@ public async ValueTask MoveNextAsync(ITrace trace) } } while (!(backendPage is ChangeFeedSuccessPage - || this.crossPartitionEnumerator.NextRange.Equals(originalRange) + || IsNextRangeEqualToOriginal(this.crossPartitionEnumerator, originalRange) || this.bufferedException.HasValue)); // Create a page with the aggregated request charge @@ -165,6 +164,14 @@ public static CrossPartitionChangeFeedAsyncEnumerator Create( return enumerator; } + private static bool IsNextRangeEqualToOriginal( + CrossPartitionRangePageAsyncEnumerator crossPartitionEnumerator, + FeedRangeInternal originalRange) + { + return crossPartitionEnumerator.TryPeek(out FeedRangeState nextState) + && originalRange.Equals(nextState.FeedRange); + } + private static CreatePartitionRangePageAsyncEnumerator MakeCreateFunction( IChangeFeedDataSource changeFeedDataSource, ChangeFeedPaginationOptions changeFeedPaginationOptions, diff --git a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs index 6f19ab95a9..bf66b03ec6 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs @@ -27,6 +27,7 @@ internal sealed class CrossPartitionRangePageAsyncEnumerator : IA private readonly CreatePartitionRangePageAsyncEnumerator createPartitionRangeEnumerator; private readonly AsyncLazy>> lazyEnumerators; private CancellationToken cancellationToken; + private FeedRangeState? nextState; public CrossPartitionRangePageAsyncEnumerator( IFeedRangeProvider feedRangeProvider, @@ -97,8 +98,6 @@ public CrossPartitionRangePageAsyncEnumerator( public FeedRangeInternal CurrentRange { get; private set; } - public FeedRangeInternal NextRange { get; private set; } - public ValueTask MoveNextAsync() { return this.MoveNextAsync(NoOpTrace.Singleton); @@ -122,7 +121,7 @@ public async ValueTask MoveNextAsync(ITrace trace) { this.Current = default; this.CurrentRange = default; - this.NextRange = default; + this.nextState = default; return false; } @@ -188,7 +187,7 @@ public async ValueTask MoveNextAsync(ITrace trace) this.Current = TryCatch>.FromException(currentPaginator.Current.Exception); this.CurrentRange = currentPaginator.FeedRangeState.FeedRange; - this.NextRange = CrossPartitionRangePageAsyncEnumerator.GetNextRange(enumerators); + this.nextState = CrossPartitionRangePageAsyncEnumerator.GetNextRange(enumerators); return true; } @@ -218,7 +217,7 @@ public async ValueTask MoveNextAsync(ITrace trace) this.Current = TryCatch>.FromResult( new CrossFeedRangePage(currentPaginator.Current.Result, crossPartitionState)); this.CurrentRange = currentPaginator.FeedRangeState.FeedRange; - this.NextRange = CrossPartitionRangePageAsyncEnumerator.GetNextRange(enumerators); + this.nextState = CrossPartitionRangePageAsyncEnumerator.GetNextRange(enumerators); return true; } } @@ -229,6 +228,18 @@ public ValueTask DisposeAsync() return default; } + public bool TryPeek(out FeedRangeState nextState) + { + if (this.nextState.HasValue) + { + nextState = this.nextState.Value; + return true; + } + + nextState = default; + return false; + } + public void SetCancellationToken(CancellationToken cancellationToken) { this.cancellationToken = cancellationToken; @@ -241,7 +252,7 @@ private static bool IsSplitException(Exception exeception) && (cosmosException.SubStatusCode == (int)Documents.SubStatusCodes.PartitionKeyRangeGone); } - private static FeedRangeInternal GetNextRange(IQueue> enumerators) + private static FeedRangeState? GetNextRange(IQueue> enumerators) { if (enumerators == null || enumerators.Count == 0) @@ -249,7 +260,7 @@ private static FeedRangeInternal GetNextRange(IQueue : IEnumerable diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs index 80e6370189..224157111b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/CrossPartitionChangeFeedAsyncEnumeratorTests.cs @@ -16,7 +16,6 @@ namespace Microsoft.Azure.Cosmos.Tests.ChangeFeed using Microsoft.Azure.Cosmos.ChangeFeed.Pagination; using System.IO; using Microsoft.Azure.Cosmos.Tracing; - using System.Collections.Immutable; using Moq; using System.Threading; using System.Text; From bb3feb3e861ffe38628e6c711de72cc25c0fecf1 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Fri, 23 Apr 2021 08:16:26 -0700 Subject: [PATCH 11/11] TryPeekNext --- .../Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs | 2 +- .../src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs index 91762d1ff6..f681e922af 100644 --- a/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/ChangeFeed/Pagination/CrossPartitionChangeFeedAsyncEnumerator.cs @@ -168,7 +168,7 @@ private static bool IsNextRangeEqualToOriginal( CrossPartitionRangePageAsyncEnumerator crossPartitionEnumerator, FeedRangeInternal originalRange) { - return crossPartitionEnumerator.TryPeek(out FeedRangeState nextState) + return crossPartitionEnumerator.TryPeekNext(out FeedRangeState nextState) && originalRange.Equals(nextState.FeedRange); } diff --git a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs index bf66b03ec6..55664cf17a 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs @@ -228,7 +228,7 @@ public ValueTask DisposeAsync() return default; } - public bool TryPeek(out FeedRangeState nextState) + public bool TryPeekNext(out FeedRangeState nextState) { if (this.nextState.HasValue) {