Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FeedRange: Fix split handling for readfeed split #1547

Merged
merged 13 commits into from
Jun 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ internal sealed class FeedRangeCompositeContinuation : FeedRangeContinuation
public CompositeContinuationToken CurrentToken { get; private set; }
private static Documents.ShouldRetryResult Retry = Documents.ShouldRetryResult.RetryAfter(TimeSpan.Zero);
private static Documents.ShouldRetryResult NoRetry = Documents.ShouldRetryResult.NoRetry();
private readonly HashSet<string> doneRanges;
ealsur marked this conversation as resolved.
Show resolved Hide resolved
private string initialNoResultsRange;

private FeedRangeCompositeContinuation(
Expand All @@ -34,7 +33,6 @@ private FeedRangeCompositeContinuation(
: base(containerRid, feedRange)
{
this.CompositeContinuationTokens = new Queue<CompositeContinuationToken>();
this.doneRanges = new HashSet<string>();
}

public override void Accept(
Expand Down Expand Up @@ -100,7 +98,7 @@ public FeedRangeCompositeContinuation(
this.CurrentToken = this.CompositeContinuationTokens.Peek();
}

public override string GetContinuation() => this.CurrentToken.Token;
public override string GetContinuation() => this.CurrentToken?.Token;

public override string ToString()
{
Expand All @@ -109,14 +107,6 @@ public override string ToString()

public override void ReplaceContinuation(string continuationToken)
{
if (continuationToken == null)
{
// Normal ReadFeed can signal termination by CT null, not NotModified
// Change Feed never lands here, as it always provides a CT
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.doneRanges.Add(this.CurrentToken.Range.Min);
}

this.CurrentToken.Token = continuationToken;
this.MoveToNextToken();
}
Expand All @@ -140,7 +130,7 @@ public override TryCatch ValidateContainer(string containerRid)
/// <summary>
/// The concept of Done is only for ReadFeed. Change Feed is never done, it is an infinite stream.
/// </summary>
public override bool IsDone => this.doneRanges.Count == this.CompositeContinuationTokens.Count;
public override bool IsDone => this.CompositeContinuationTokens.Count == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public override bool IsDone => this.CompositeContinuationTokens.Count == 0;
public override bool IsDone => this.CompositeContinuationTokens.Any();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this would do the opposite. Done is if there are no more items in the queue, the change with Any would return true if there is at least one.


public override Documents.ShouldRetryResult HandleChangeFeedNotModified(ResponseMessage responseMessage)
{
Expand Down Expand Up @@ -208,6 +198,38 @@ public override Documents.ShouldRetryResult HandleChangeFeedNotModified(Response
}
}

private static bool TryParseAsCompositeContinuationToken(
string providedContinuation,
out CompositeContinuationToken compositeContinuationToken)
{
compositeContinuationToken = null;
try
{
if (providedContinuation.Trim().StartsWith("[", StringComparison.Ordinal))
{
List<CompositeContinuationToken> compositeContinuationTokens = JsonConvert.DeserializeObject<List<CompositeContinuationToken>>(providedContinuation);

if (compositeContinuationTokens != null && compositeContinuationTokens.Count > 0)
{
compositeContinuationToken = compositeContinuationTokens[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the other composite continuation tokens? Shouldn't this output a list of composite continuation tokens or should it be renamed to TryParseFirstCompositeContinuationToken?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same logic that exists in PartitionKeyRangeHandler, the list is a backward compatible measure, it contains 1 item

}

return compositeContinuationToken != null;
}
else if (providedContinuation.Trim().StartsWith("{", StringComparison.Ordinal))
{
compositeContinuationToken = JsonConvert.DeserializeObject<CompositeContinuationToken>(providedContinuation);
return compositeContinuationToken != null;
}

return false;
}
catch (JsonException)
{
return false;
}
}

private static CompositeContinuationToken CreateCompositeContinuationTokenForRange(
string minInclusive,
string maxExclusive,
Expand All @@ -223,32 +245,52 @@ private static CompositeContinuationToken CreateCompositeContinuationTokenForRan
private void MoveToNextToken()
{
CompositeContinuationToken recentToken = this.CompositeContinuationTokens.Dequeue();
ealsur marked this conversation as resolved.
Show resolved Hide resolved
this.CompositeContinuationTokens.Enqueue(recentToken);
this.CurrentToken = this.CompositeContinuationTokens.Peek();

// In a Query / ReadFeed not Change Feed, skip ranges that are done to avoid requests
while (!this.IsDone &&
this.doneRanges.Contains(this.CurrentToken.Range.Min))
if (recentToken.Token != null)
{
this.MoveToNextToken();
// Normal ReadFeed can signal termination by CT null, not NotModified
ealsur marked this conversation as resolved.
Show resolved Hide resolved
// Change Feed never lands here, as it always provides a CT
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.CompositeContinuationTokens.Enqueue(recentToken);
}

this.CurrentToken = this.CompositeContinuationTokens.Count > 0 ? this.CompositeContinuationTokens.Peek() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.CurrentToken = this.CompositeContinuationTokens.Count > 0 ? this.CompositeContinuationTokens.Peek() : null;
if(!this.CompositeContinuationTokens.TryPeek(out this.CurrentToken)){
this.CurrentToken = null;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am tempted to pick this change, but I already know the current state passes all expected validations. I would need to validate splits on master all over with any code changes and this is mainly a readability optimization

}

private void CreateChildRanges(IReadOnlyList<Documents.PartitionKeyRange> keyRanges)
{
if (keyRanges == null) throw new ArgumentNullException(nameof(keyRanges));

// Update current
Documents.PartitionKeyRange firstRange = keyRanges[0];
this.CurrentToken.Range = new Documents.Routing.Range<string>(firstRange.MinInclusive, firstRange.MaxExclusive, true, false);
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
if (FeedRangeCompositeContinuation.TryParseAsCompositeContinuationToken(
ealsur marked this conversation as resolved.
Show resolved Hide resolved
ealsur marked this conversation as resolved.
Show resolved Hide resolved
this.CurrentToken.Token,
out CompositeContinuationToken continuationAsComposite))
{
this.CompositeContinuationTokens.Enqueue(
FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange(
keyRange.MinInclusive,
keyRange.MaxExclusive,
this.CurrentToken.Token));
// Update the internal composite continuation
continuationAsComposite.Range = this.CurrentToken.Range;
this.CurrentToken.Token = JsonConvert.SerializeObject(continuationAsComposite);
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
{
continuationAsComposite.Range = keyRange.ToRange();
this.CompositeContinuationTokens.Enqueue(
FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange(
keyRange.MinInclusive,
keyRange.MaxExclusive,
JsonConvert.SerializeObject(continuationAsComposite)));
}
}
else
{
// Add children
foreach (Documents.PartitionKeyRange keyRange in keyRanges.Skip(1))
{
this.CompositeContinuationTokens.Enqueue(
FeedRangeCompositeContinuation.CreateCompositeContinuationTokenForRange(
keyRange.MinInclusive,
keyRange.MaxExclusive,
this.CurrentToken.Token));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal sealed class FeedRangeEPK : FeedRangeInternal
{
public Documents.Routing.Range<string> Range { get; }

public static FeedRangeEPK ForCompleteRange()
public static FeedRangeEPK ForFullRange()
{
return new FeedRangeEPK(new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Json;
using Microsoft.Azure.Cosmos.Json.Interop;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
Expand Down Expand Up @@ -46,7 +43,7 @@ public static FeedRangeIteratorCore Create(
}

// Backward compatible with old format
feedRangeInternal = FeedRangeEPK.ForCompleteRange();
feedRangeInternal = FeedRangeEPK.ForFullRange();
feedRangeContinuation = new FeedRangeCompositeContinuation(
string.Empty,
feedRangeInternal,
Expand All @@ -62,7 +59,7 @@ public static FeedRangeIteratorCore Create(
return new FeedRangeIteratorCore(containerCore, feedRangeContinuation, options);
}

feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForCompleteRange();
feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange();
return new FeedRangeIteratorCore(containerCore, feedRangeInternal, options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ChangeFeedIteratorCore Create(
}
}

feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForCompleteRange();
feedRangeInternal = feedRangeInternal ?? FeedRangeEPK.ForFullRange();
return new ChangeFeedIteratorCore(container, feedRangeInternal, changeFeedRequestOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.Tests.FeedRange
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using Newtonsoft.Json;

[TestClass]
public class FeedRangeContinuationTests
Expand Down Expand Up @@ -98,12 +99,16 @@ public void FeedRangeCompositeContinuation_ShouldRetry()

ResponseMessage okResponse = new ResponseMessage(HttpStatusCode.OK);
okResponse.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1";
okResponse.Headers[Documents.HttpConstants.HttpHeaders.ETag] = "1";
Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(okResponse).ShouldRetry);

ResponseMessage notModified = new ResponseMessage(HttpStatusCode.NotModified);
notModified.Headers[Documents.HttpConstants.HttpHeaders.ETag] = "1";

// A 304 on a multi Range token should cycle on all available ranges before stopping retrying
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry);
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry);
Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(new ResponseMessage(HttpStatusCode.NotModified)).ShouldRetry);
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry);
Assert.IsTrue(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry);
Assert.IsFalse(feedRangeCompositeContinuation.HandleChangeFeedNotModified(notModified).ShouldRetry);
}

[TestMethod]
Expand Down Expand Up @@ -153,6 +158,55 @@ public async Task FeedRangeCompositeContinuation_HandleSplits()
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[2].Range.Max);
}

[TestMethod]
public async Task FeedRangeCompositeContinuation_HandleSplits_ReadFeed()
{
List<CompositeContinuationToken> compositeContinuationTokens = new List<CompositeContinuationToken>()
{
FeedRangeContinuationTests.BuildTokenForRange("A", "C", JsonConvert.SerializeObject(new CompositeContinuationToken() { Token = "token1", Range = new Documents.Routing.Range<string>("A", "C", true, false) })),
FeedRangeContinuationTests.BuildTokenForRange("C", "F", JsonConvert.SerializeObject(new CompositeContinuationToken() { Token = "token2", Range = new Documents.Routing.Range<string>("C", "F", true, false) })),
};

FeedRangeCompositeContinuation feedRangeCompositeContinuation = new FeedRangeCompositeContinuation(Guid.NewGuid().ToString(), Mock.Of<FeedRangeInternal>(), JsonConvert.DeserializeObject<List<CompositeContinuationToken>>(JsonConvert.SerializeObject(compositeContinuationTokens)));

MultiRangeMockDocumentClient documentClient = new MultiRangeMockDocumentClient();

Mock<CosmosClientContext> cosmosClientContext = new Mock<CosmosClientContext>();
cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions());
cosmosClientContext.Setup(c => c.DocumentClient).Returns(documentClient);

Mock<ContainerInternal> containerCore = new Mock<ContainerInternal>();
containerCore
.Setup(c => c.ClientContext).Returns(cosmosClientContext.Object);

Assert.AreEqual(2, feedRangeCompositeContinuation.CompositeContinuationTokens.Count);

ResponseMessage split = new ResponseMessage(HttpStatusCode.Gone);
split.Headers.SubStatusCode = Documents.SubStatusCodes.PartitionKeyRangeGone;
Assert.IsTrue((await feedRangeCompositeContinuation.HandleSplitAsync(containerCore.Object, split, default(CancellationToken))).ShouldRetry);

// verify token state
// Split should have updated initial and created a new token at the end
Assert.AreEqual(3, feedRangeCompositeContinuation.CompositeContinuationTokens.Count);
CompositeContinuationToken[] continuationTokens = feedRangeCompositeContinuation.CompositeContinuationTokens.ToArray();
// First token is split
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Range.Min, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[0].Token).Range.Min);
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Token, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[0].Token).Token);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[0].MinInclusive, continuationTokens[0].Range.Min);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[0].MaxExclusive, continuationTokens[0].Range.Max);

// Second token remains the same
Assert.AreEqual(compositeContinuationTokens[1].Token, continuationTokens[1].Token);
Assert.AreEqual(compositeContinuationTokens[1].Range.Min, continuationTokens[1].Range.Min);
Assert.AreEqual(compositeContinuationTokens[1].Range.Max, continuationTokens[1].Range.Max);

// New third token
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Range.Max, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[2].Token).Range.Max);
Assert.AreEqual(JsonConvert.DeserializeObject<CompositeContinuationToken>(compositeContinuationTokens[0].Token).Token, JsonConvert.DeserializeObject<CompositeContinuationToken>(continuationTokens[2].Token).Token);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MinInclusive, continuationTokens[2].Range.Min);
Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[2].Range.Max);
}

[TestMethod]
public void FeedRangeCompositeContinuation_IsDone()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public void ReadFeedIteratorCore_Create_Default()
{
FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of<ContainerInternal>(), null, null, null);
FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK;
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max);
Assert.IsNull(feedTokenIterator.FeedRangeContinuation);
}

Expand All @@ -53,8 +53,8 @@ public void ReadFeedIteratorCore_Create_WithContinuation()
string continuation = Guid.NewGuid().ToString();
FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of<ContainerInternal>(), null, continuation, null);
FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK;
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max);
Assert.IsNotNull(feedTokenIterator.FeedRangeContinuation);
Assert.AreEqual(continuation, feedTokenIterator.FeedRangeContinuation.GetContinuation());
}
Expand All @@ -64,12 +64,12 @@ public void ReadFeedIteratorCore_Create_WithFeedContinuation()
{

string continuation = Guid.NewGuid().ToString();
FeedRangeEPK feedRangeEPK = FeedRangeEPK.ForCompleteRange();
FeedRangeEPK feedRangeEPK = FeedRangeEPK.ForFullRange();
FeedRangeCompositeContinuation feedRangeSimpleContinuation = new FeedRangeCompositeContinuation(Guid.NewGuid().ToString(), feedRangeEPK, new List<Documents.Routing.Range<string>>() { feedRangeEPK.Range }, continuation);
FeedRangeIteratorCore feedTokenIterator = FeedRangeIteratorCore.Create(Mock.Of<ContainerInternal>(), null, feedRangeSimpleContinuation.ToString(), null);
FeedRangeEPK defaultRange = feedTokenIterator.FeedRangeInternal as FeedRangeEPK;
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForCompleteRange().Range.Max, defaultRange.Range.Max);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Min, defaultRange.Range.Min);
Assert.AreEqual(FeedRangeEPK.ForFullRange().Range.Max, defaultRange.Range.Max);
Assert.IsNotNull(feedTokenIterator.FeedRangeContinuation);
Assert.AreEqual(continuation, feedTokenIterator.FeedRangeContinuation.GetContinuation());
}
Expand Down