Skip to content

Commit

Permalink
FeedRange: Fix split handling for readfeed split (#1547)
Browse files Browse the repository at this point in the history
* Handling the split

* UT

* done ranges

* Remove PartitionKeyRangeHandler modifications

* test

* support for EPK on properties

* Removing constant

* test fix

* throwing exception

* Rename

* Reverting to original state

* newline

* undo autoformatting
  • Loading branch information
ealsur committed Jun 11, 2020
1 parent 180ef23 commit 74141c9
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ internal sealed class FeedRangeCompositeContinuation : FeedRangeContinuation
public CompositeContinuationToken CurrentToken { get; private set; }
private static Documents.ShouldRetryResult Retry = Documents.ShouldRetryResult.RetryAfter(TimeSpan.Zero);
private static Documents.ShouldRetryResult NoRetry = Documents.ShouldRetryResult.NoRetry();
private readonly HashSet<string> doneRanges;
private string initialNoResultsRange;

private FeedRangeCompositeContinuation(
Expand All @@ -34,7 +33,6 @@ private FeedRangeCompositeContinuation(
: base(containerRid, feedRange)
{
this.CompositeContinuationTokens = new Queue<CompositeContinuationToken>();
this.doneRanges = new HashSet<string>();
}

public override void Accept(
Expand Down Expand Up @@ -100,7 +98,7 @@ public FeedRangeCompositeContinuation(
this.CurrentToken = this.CompositeContinuationTokens.Peek();
}

public override string GetContinuation() => this.CurrentToken.Token;
public override string GetContinuation() => this.CurrentToken?.Token;

public override string ToString()
{
Expand All @@ -109,14 +107,6 @@ public override string ToString()

public override void ReplaceContinuation(string continuationToken)
{
if (continuationToken == null)
{
// Normal ReadFeed can signal termination by CT null, not NotModified
// Change Feed never lands here, as it always provides a CT
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.doneRanges.Add(this.CurrentToken.Range.Min);
}

this.CurrentToken.Token = continuationToken;
this.MoveToNextToken();
}
Expand All @@ -140,7 +130,7 @@ public override TryCatch ValidateContainer(string containerRid)
/// <summary>
/// The concept of Done is only for ReadFeed. Change Feed is never done, it is an infinite stream.
/// </summary>
public override bool IsDone => this.doneRanges.Count == this.CompositeContinuationTokens.Count;
public override bool IsDone => this.CompositeContinuationTokens.Count == 0;

public override Documents.ShouldRetryResult HandleChangeFeedNotModified(ResponseMessage responseMessage)
{
Expand Down Expand Up @@ -208,6 +198,38 @@ public override Documents.ShouldRetryResult HandleChangeFeedNotModified(Response
}
}

private static bool TryParseAsCompositeContinuationToken(
string providedContinuation,
out CompositeContinuationToken compositeContinuationToken)
{
compositeContinuationToken = null;
try
{
if (providedContinuation.Trim().StartsWith("[", StringComparison.Ordinal))
{
List<CompositeContinuationToken> compositeContinuationTokens = JsonConvert.DeserializeObject<List<CompositeContinuationToken>>(providedContinuation);

if (compositeContinuationTokens != null && compositeContinuationTokens.Count > 0)
{
compositeContinuationToken = compositeContinuationTokens[0];
}

return compositeContinuationToken != null;
}
else if (providedContinuation.Trim().StartsWith("{", StringComparison.Ordinal))
{
compositeContinuationToken = JsonConvert.DeserializeObject<CompositeContinuationToken>(providedContinuation);
return compositeContinuationToken != null;
}

return false;
}
catch (JsonException)
{
return false;
}
}

private static CompositeContinuationToken CreateCompositeContinuationTokenForRange(
string minInclusive,
string maxExclusive,
Expand All @@ -223,32 +245,52 @@ private static CompositeContinuationToken CreateCompositeContinuationTokenForRan
private void MoveToNextToken()
{
CompositeContinuationToken recentToken = this.CompositeContinuationTokens.Dequeue();
this.CompositeContinuationTokens.Enqueue(recentToken);
this.CurrentToken = this.CompositeContinuationTokens.Peek();

// In a Query / ReadFeed not Change Feed, skip ranges that are done to avoid requests
while (!this.IsDone &&
this.doneRanges.Contains(this.CurrentToken.Range.Min))
if (recentToken.Token != null)
{
this.MoveToNextToken();
// Normal ReadFeed can signal termination by CT null, not NotModified
// Change Feed never lands here, as it always provides a CT
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.CompositeContinuationTokens.Enqueue(recentToken);
}

this.CurrentToken = this.CompositeContinuationTokens.Count > 0 ? this.CompositeContinuationTokens.Peek() : null;
}

private void CreateChildRanges(IReadOnlyList<Documents.PartitionKeyRange> keyRanges)
{
if (keyRanges == null) throw new ArgumentNullException(nameof(keyRanges));

// Update current
Documents.PartitionKeyRange firstRange = keyRanges[0];
this.CurrentToken.Range = new Documents.Routing.Range<string>(firstRange.MinInclusive, firstRange.MaxExclusive, true, false);
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
if (FeedRangeCompositeContinuation.TryParseAsCompositeContinuationToken(
this.CurrentToken.Token,
out CompositeContinuationToken continuationAsComposite))
{
this.CompositeContinuationTokens.Enqueue(
FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange(
keyRange.MinInclusive,
keyRange.MaxExclusive,
this.CurrentToken.Token));
// Update the internal composite continuation
continuationAsComposite.Range = this.CurrentToken.Range;
this.CurrentToken.Token = JsonConvert.SerializeObject(continuationAsComposite);
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
{
continuationAsComposite.Range = keyRange.ToRange();
this.CompositeContinuationTokens.Enqueue(
FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange(
keyRange.MinInclusive,
keyRange.MaxExclusive,
JsonConvert.SerializeObject(continuationAsComposite)));
}
}
else
{
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
{
this.CompositeContinuationTokens.Enqueue(
FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange(
keyRange.MinInclusive,
keyRange.MaxExclusive,
this.CurrentToken.Token));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal sealed class FeedRangeEPK : FeedRangeInternal
{
public Documents.Routing.Range<string> Range { get; }

public static FeedRangeEPK ForCompleteRange()
public static FeedRangeEPK ForFullRange()
{
return new FeedRangeEPK(new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Json.Interop;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
Expand Down Expand Up @@ -46,7 +43,7 @@ public static FeedRangeIteratorCore Create(
}

// Backward compatible with old format
feedRangeInternal = FeedRangeEPK.ForCompleteRange();
feedRangeInternal = FeedRangeEPK.ForFullRange();
feedRangeContinuation = new FeedRangeCompositeContinuation(
string.Empty,
feedRangeInternal,
Expand All @@ -62,7 +59,7 @@ public static FeedRangeIteratorCore Create(
return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options);
}

feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForCompleteRange();
feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange();
return new FeedRangeIteratorCore(containerCore, feedRangeInternal, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ChangeFeedIteratorCore Create(
}
}

feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForCompleteRange();
feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange();
return new ChangeFeedIteratorCore(container, feedRangeInternal, changeFeedRequestOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
public class FeedRangeContinuationTests
Expand Down Expand Up @@ -98,12 +99,16 @@ public void FeedRangeCompositeContinuation_ShouldRetry()

ResponseMessage okResponse = new ResponseMessage(HttpStatusCode.OK);
okResponse.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1";
okResponse.Headers[Documents.HttpConstants.HttpHeaders.ETag] = "1";
Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(okResponse).ShouldRetry);

ResponseMessage notModified = new ResponseMessage(HttpStatusCode.NotModified);
notModified.Headers[Documents.HttpConstants.HttpHeaders.ETag] = "1";

// A 304 on a multi Range token should cycle on all available ranges before stopping retrying
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry);
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry);
Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry);
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry);
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry);
Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry);
}

[TestMethod]
Expand Down Expand Up @@ -153,6 +158,55 @@ public async Task FeedRangeCompositeContinuation_HandleSplits()
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[2].Range.Max);
}

[TestMethod]
public async Task FeedRangeCompositeContinuation_HandleSplits_ReadFeed()
{
List<CompositeContinuationToken> compositeContinuationTokens = new List<CompositeContinuationToken>()
{
FeedRangeContinuationTests.BuildTokenForRange("A", "C", JsonConvert.SerializeObject(new CompositeContinuationToken() { Token = "token1", Range = new Documents.Routing.Range<string>("A", "C", true, false) })),
FeedRangeContinuationTests.BuildTokenForRange("C", "F", JsonConvert.SerializeObject(new CompositeContinuationToken() { Token = "token2", Range = new Documents.Routing.Range<string>("C", "F", true, false) })),
};

FeedRangeCompositeContinuation feedRangeCompositeContinuation = new FeedRangeCompositeContinuation(Guid.NewGuid().ToString(), Mock.Of<FeedRangeInternal>(), JsonConvert.DeserializeObject<List<CompositeContinuationToken>>(JsonConvert.SerializeObject(compositeContinuationTokens)));

MultiRangeMockDocumentClient documentClient = new MultiRangeMockDocumentClient();

Mock<CosmosClientContext> cosmosClientContext = new Mock<CosmosClientContext>();
cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions());
cosmosClientContext.Setup(c => c.DocumentClient).Returns(documentClient);

Mock<ContainerInternal> containerCore = new Mock<ContainerInternal>();
containerCore
.Setup(c => c.ClientContext).Returns(cosmosClientContext.Object);

Assert.AreEqual(2, feedRangeCompositeContinuation.CompositeContinuationTokens.Count);

ResponseMessage split = new ResponseMessage(HttpStatusCode.Gone);
split.Headers.SubStatusCode = Documents.SubStatusCodes.PartitionKeyRangeGone;
Assert.IsTrue((await feedRangeCompositeContinuation.HandleSplitAsync(containerCore.Object, split, default(CancellationToken))).ShouldRetry);

// verify token state
// Split should have updated initial and created a new token at the end
Assert.AreEqual(3, feedRangeCompositeContinuation.CompositeContinuationTokens.Count);
CompositeContinuationToken[] continuationTokens = feedRangeCompositeContinuation.CompositeContinuationTokens.ToArray();
// First token is split
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Range.Min, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[0].Token).Range.Min);
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Token, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[0].Token).Token);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[0].MinInclusive, continuationTokens[0].Range.Min);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[0].MaxExclusive, continuationTokens[0].Range.Max);

// Second token remains the same
Assert.AreEqual(compositeContinuationTokens[1].Token, continuationTokens[1].Token);
Assert.AreEqual(compositeContinuationTokens[1].Range.Min, continuationTokens[1].Range.Min);
Assert.AreEqual(compositeContinuationTokens[1].Range.Max, continuationTokens[1].Range.Max);

// New third token
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Range.Max, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[2].Token).Range.Max);
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Token, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[2].Token).Token);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MinInclusive, continuationTokens[2].Range.Min);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[2].Range.Max);
}

[TestMethod]
public void FeedRangeCompositeContinuation_IsDone()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void ReadFeedIteratorCore_Create_Default()
{
FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of<ContainerInternal>(), null, null, null);
FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK;
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max);
Assert.IsNull(feedTokenIterator.FeedRangeContinuation);
}

Expand All @@ -53,8 +53,8 @@ public void ReadFeedIteratorCore_Create_WithContinuation()
string continuation = Guid.NewGuid().ToString();
FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of<ContainerInternal>(), null, continuation, null);
FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK;
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max);
Assert.IsNotNull(feedTokenIterator.FeedRangeContinuation);
Assert.AreEqual(continuation, feedTokenIterator.FeedRangeContinuation.GetContinuation());
}
Expand All @@ -64,12 +64,12 @@ public void ReadFeedIteratorCore_Create_WithFeedContinuation()
{

string continuation = Guid.NewGuid().ToString();
FeedRangeEPK feedRangeEPK = FeedRangeEPK.ForCompleteRange();
FeedRangeEPK feedRangeEPK = FeedRangeEPK.ForFullRange();
FeedRangeCompositeContinuation feedRangeSimpleContinuation = new FeedRangeCompositeContinuation(Guid.NewGuid().ToString(), feedRangeEPK, new List<Documents.Routing.Range<string>>() { feedRangeEPK.Range }, continuation);
FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of<ContainerInternal>(), null, feedRangeSimpleContinuation.ToString(), null);
FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK;
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max);
Assert.IsNotNull(feedTokenIterator.FeedRangeContinuation);
Assert.AreEqual(continuation, feedTokenIterator.FeedRangeContinuation.GetContinuation());
}
Expand Down

0 comments on commit 74141c9

Please sign in to comment.