diff --git a/Microsoft.Azure.Cosmos/src/FeedRange/Continuations/FeedRangeCompositeContinuation.cs b/Microsoft.Azure.Cosmos/src/FeedRange/Continuations/FeedRangeCompositeContinuation.cs index 6bc9d7369f..5035a6f388 100644 --- a/Microsoft.Azure.Cosmos/src/FeedRange/Continuations/FeedRangeCompositeContinuation.cs +++ b/Microsoft.Azure.Cosmos/src/FeedRange/Continuations/FeedRangeCompositeContinuation.cs @@ -25,7 +25,6 @@ internal sealed class FeedRangeCompositeContinuation : FeedRangeContinuation public CompositeContinuationToken CurrentToken { get; private set; } private static Documents.ShouldRetryResult Retry = Documents.ShouldRetryResult.RetryAfter(TimeSpan.Zero); private static Documents.ShouldRetryResult NoRetry = Documents.ShouldRetryResult.NoRetry(); - private readonly HashSet doneRanges; private string initialNoResultsRange; private FeedRangeCompositeContinuation( @@ -34,7 +33,6 @@ private FeedRangeCompositeContinuation( : base(containerRid, feedRange) { this.CompositeContinuationTokens = new Queue(); - this.doneRanges = new HashSet(); } public override void Accept( @@ -100,7 +98,7 @@ public FeedRangeCompositeContinuation( this.CurrentToken = this.CompositeContinuationTokens.Peek(); } - public override string GetContinuation() => this.CurrentToken.Token; + public override string GetContinuation() => this.CurrentToken?.Token; public override string ToString() { @@ -109,14 +107,6 @@ public override string ToString() public override void ReplaceContinuation(string continuationToken) { - if (continuationToken == null) - { - // Normal ReadFeed can signal termination by CT null, not NotModified - // Change Feed never lands here, as it always provides a CT - // Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done - this.doneRanges.Add(this.CurrentToken.Range.Min); - } - this.CurrentToken.Token = continuationToken; this.MoveToNextToken(); } @@ -140,7 +130,7 @@ public override TryCatch ValidateContainer(string containerRid) /// /// The concept of Done is only for ReadFeed. Change Feed is never done, it is an infinite stream. /// - public override bool IsDone => this.doneRanges.Count == this.CompositeContinuationTokens.Count; + public override bool IsDone => this.CompositeContinuationTokens.Count == 0; public override Documents.ShouldRetryResult HandleChangeFeedNotModified(ResponseMessage responseMessage) { @@ -208,6 +198,38 @@ public override Documents.ShouldRetryResult HandleChangeFeedNotModified(Response } } + private static bool TryParseAsCompositeContinuationToken( + string providedContinuation, + out CompositeContinuationToken compositeContinuationToken) + { + compositeContinuationToken = null; + try + { + if (providedContinuation.Trim().StartsWith("[", StringComparison.Ordinal)) + { + List compositeContinuationTokens = JsonConvert.DeserializeObject>(providedContinuation); + + if (compositeContinuationTokens != null && compositeContinuationTokens.Count > 0) + { + compositeContinuationToken = compositeContinuationTokens[0]; + } + + return compositeContinuationToken != null; + } + else if (providedContinuation.Trim().StartsWith("{", StringComparison.Ordinal)) + { + compositeContinuationToken = JsonConvert.DeserializeObject(providedContinuation); + return compositeContinuationToken != null; + } + + return false; + } + catch (JsonException) + { + return false; + } + } + private static CompositeContinuationToken CreateCompositeContinuationTokenForRange( string minInclusive, string maxExclusive, @@ -223,32 +245,52 @@ private static CompositeContinuationToken CreateCompositeContinuationTokenForRan private void MoveToNextToken() { CompositeContinuationToken recentToken = this.CompositeContinuationTokens.Dequeue(); - this.CompositeContinuationTokens.Enqueue(recentToken); - this.CurrentToken = this.CompositeContinuationTokens.Peek(); - - // In a Query / ReadFeed not Change Feed, skip ranges that are done to avoid requests - while (!this.IsDone && - this.doneRanges.Contains(this.CurrentToken.Range.Min)) + if (recentToken.Token != null) { - this.MoveToNextToken(); + // Normal ReadFeed can signal termination by CT null, not NotModified + // Change Feed never lands here, as it always provides a CT + // Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done + this.CompositeContinuationTokens.Enqueue(recentToken); } + + this.CurrentToken = this.CompositeContinuationTokens.Count > 0 ? this.CompositeContinuationTokens.Peek() : null; } private void CreateChildRanges(IReadOnlyList keyRanges) { if (keyRanges == null) throw new ArgumentNullException(nameof(keyRanges)); - // Update current Documents.PartitionKeyRange firstRange = keyRanges[0]; this.CurrentToken.Range = new Documents.Routing.Range(firstRange.MinInclusive, firstRange.MaxExclusive, true, false); - // Add children - foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1)) + if (FeedRangeCompositeContinuation.TryParseAsCompositeContinuationToken( + this.CurrentToken.Token, + out CompositeContinuationToken continuationAsComposite)) { - this.CompositeContinuationTokens.Enqueue( - FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange( - keyRange.MinInclusive, - keyRange.MaxExclusive, - this.CurrentToken.Token)); + // Update the internal composite continuation + continuationAsComposite.Range = this.CurrentToken.Range; + this.CurrentToken.Token = JsonConvert.SerializeObject(continuationAsComposite); + // Add children + foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1)) + { + continuationAsComposite.Range = keyRange.ToRange(); + this.CompositeContinuationTokens.Enqueue( + FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange( + keyRange.MinInclusive, + keyRange.MaxExclusive, + JsonConvert.SerializeObject(continuationAsComposite))); + } + } + else + { + // Add children + foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1)) + { + this.CompositeContinuationTokens.Enqueue( + FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange( + keyRange.MinInclusive, + keyRange.MaxExclusive, + this.CurrentToken.Token)); + } } } diff --git a/Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEPK.cs b/Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEPK.cs index c89a47e585..32a4467493 100644 --- a/Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEPK.cs +++ b/Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEPK.cs @@ -17,7 +17,7 @@ internal sealed class FeedRangeEPK : FeedRangeInternal { public Documents.Routing.Range Range { get; } - public static FeedRangeEPK ForCompleteRange() + public static FeedRangeEPK ForFullRange() { return new FeedRangeEPK(new Documents.Routing.Range( Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, diff --git a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedRangeIteratorCore.cs b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedRangeIteratorCore.cs index dc5d05e2e6..e8aa5870a3 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedRangeIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/FeedIterators/FeedRangeIteratorCore.cs @@ -6,13 +6,10 @@ namespace Microsoft.Azure.Cosmos { using System; using System.Collections.Generic; - using System.IO; - using System.Runtime.InteropServices; + using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; - using Microsoft.Azure.Cosmos.Json; - using Microsoft.Azure.Cosmos.Json.Interop; using Microsoft.Azure.Cosmos.Query.Core; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; @@ -46,7 +43,7 @@ public static FeedRangeIteratorCore Create( } // Backward compatible with old format - feedRangeInternal = FeedRangeEPK.ForCompleteRange(); + feedRangeInternal = FeedRangeEPK.ForFullRange(); feedRangeContinuation = new FeedRangeCompositeContinuation( string.Empty, feedRangeInternal, @@ -62,7 +59,7 @@ public static FeedRangeIteratorCore Create( return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options); } - feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForCompleteRange(); + feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange(); return new FeedRangeIteratorCore(containerCore, feedRangeInternal, options); } diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs index c275e1e5f8..fbd8dc2a63 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs @@ -48,7 +48,7 @@ public static ChangeFeedIteratorCore Create( } } - feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForCompleteRange(); + feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange(); return new ChangeFeedIteratorCore(container, feedRangeInternal, changeFeedRequestOptions); } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/FeedRangeContinuationTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/FeedRangeContinuationTests.cs index cba5f156a3..c541d02ffd 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/FeedRangeContinuationTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/FeedRangeContinuationTests.cs @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; + using Newtonsoft.Json; [TestClass] public class FeedRangeContinuationTests @@ -98,12 +99,16 @@ public void FeedRangeCompositeContinuation_ShouldRetry() ResponseMessage okResponse = new ResponseMessage(HttpStatusCode.OK); okResponse.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + okResponse.Headers[Documents.HttpConstants.HttpHeaders.ETag] = "1"; Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(okResponse).ShouldRetry); + ResponseMessage notModified = new ResponseMessage(HttpStatusCode.NotModified); + notModified.Headers[Documents.HttpConstants.HttpHeaders.ETag] = "1"; + // A 304 on a multi Range token should cycle on all available ranges before stopping retrying - Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry); - Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry); - Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry); + Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry); + Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry); + Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry); } [TestMethod] @@ -153,6 +158,55 @@ public async Task FeedRangeCompositeContinuation_HandleSplits() Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[2].Range.Max); } + [TestMethod] + public async Task FeedRangeCompositeContinuation_HandleSplits_ReadFeed() + { + List compositeContinuationTokens = new List() + { + FeedRangeContinuationTests.BuildTokenForRange("A", "C", JsonConvert.SerializeObject(new CompositeContinuationToken() { Token = "token1", Range = new Documents.Routing.Range("A", "C", true, false) })), + FeedRangeContinuationTests.BuildTokenForRange("C", "F", JsonConvert.SerializeObject(new CompositeContinuationToken() { Token = "token2", Range = new Documents.Routing.Range("C", "F", true, false) })), + }; + + FeedRangeCompositeContinuation feedRangeCompositeContinuation = new FeedRangeCompositeContinuation(Guid.NewGuid().ToString(), Mock.Of(), JsonConvert.DeserializeObject>(JsonConvert.SerializeObject(compositeContinuationTokens))); + + MultiRangeMockDocumentClient documentClient = new MultiRangeMockDocumentClient(); + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext.Setup(c => c.DocumentClient).Returns(documentClient); + + Mock containerCore = new Mock(); + containerCore + .Setup(c => c.ClientContext).Returns(cosmosClientContext.Object); + + Assert.AreEqual(2, feedRangeCompositeContinuation.CompositeContinuationTokens.Count); + + ResponseMessage split = new ResponseMessage(HttpStatusCode.Gone); + split.Headers.SubStatusCode = Documents.SubStatusCodes.PartitionKeyRangeGone; + Assert.IsTrue((await feedRangeCompositeContinuation.HandleSplitAsync(containerCore.Object, split, default(CancellationToken))).ShouldRetry); + + // verify token state + // Split should have updated initial and created a new token at the end + Assert.AreEqual(3, feedRangeCompositeContinuation.CompositeContinuationTokens.Count); + CompositeContinuationToken[] continuationTokens = feedRangeCompositeContinuation.CompositeContinuationTokens.ToArray(); + // First token is split + Assert.AreEqual(JsonConvert.DeserializeObject(compositeContinuationTokens[0].Token).Range.Min, JsonConvert.DeserializeObject(continuationTokens[0].Token).Range.Min); + Assert.AreEqual(JsonConvert.DeserializeObject(compositeContinuationTokens[0].Token).Token, JsonConvert.DeserializeObject(continuationTokens[0].Token).Token); + Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[0].MinInclusive, continuationTokens[0].Range.Min); + Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[0].MaxExclusive, continuationTokens[0].Range.Max); + + // Second token remains the same + Assert.AreEqual(compositeContinuationTokens[1].Token, continuationTokens[1].Token); + Assert.AreEqual(compositeContinuationTokens[1].Range.Min, continuationTokens[1].Range.Min); + Assert.AreEqual(compositeContinuationTokens[1].Range.Max, continuationTokens[1].Range.Max); + + // New third token + Assert.AreEqual(JsonConvert.DeserializeObject(compositeContinuationTokens[0].Token).Range.Max, JsonConvert.DeserializeObject(continuationTokens[2].Token).Range.Max); + Assert.AreEqual(JsonConvert.DeserializeObject(compositeContinuationTokens[0].Token).Token, JsonConvert.DeserializeObject(continuationTokens[2].Token).Token); + Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MinInclusive, continuationTokens[2].Range.Min); + Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[2].Range.Max); + } + [TestMethod] public void FeedRangeCompositeContinuation_IsDone() { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ReadFeedTokenIteratorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ReadFeedTokenIteratorCoreTests.cs index 9b0612646b..4b5638fad0 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ReadFeedTokenIteratorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedRange/ReadFeedTokenIteratorCoreTests.cs @@ -31,8 +31,8 @@ public void ReadFeedIteratorCore_Create_Default() { FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of(), null, null, null); FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK; - Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min); - Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max); + Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min); + Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max); Assert.IsNull(feedTokenIterator.FeedRangeContinuation); } @@ -53,8 +53,8 @@ public void ReadFeedIteratorCore_Create_WithContinuation() string continuation = Guid.NewGuid().ToString(); FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of(), null, continuation, null); FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK; - Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min); - Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max); + Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min); + Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max); Assert.IsNotNull(feedTokenIterator.FeedRangeContinuation); Assert.AreEqual(continuation, feedTokenIterator.FeedRangeContinuation.GetContinuation()); } @@ -64,12 +64,12 @@ public void ReadFeedIteratorCore_Create_WithFeedContinuation() { string continuation = Guid.NewGuid().ToString(); - FeedRangeEPK feedRangeEPK = FeedRangeEPK.ForCompleteRange(); + FeedRangeEPK feedRangeEPK = FeedRangeEPK.ForFullRange(); FeedRangeCompositeContinuation feedRangeSimpleContinuation = new FeedRangeCompositeContinuation(Guid.NewGuid().ToString(), feedRangeEPK, new List>() { feedRangeEPK.Range }, continuation); FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of(), null, feedRangeSimpleContinuation.ToString(), null); FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK; - Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min); - Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max); + Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min); + Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max); Assert.IsNotNull(feedTokenIterator.FeedRangeContinuation); Assert.AreEqual(continuation, feedTokenIterator.FeedRangeContinuation.GetContinuation()); }