Skip to content

Commit

Permalink
Preview - FeedToken support for ReadFeed (#1230)
Browse files Browse the repository at this point in the history
* GetTokens implementation

* ChangeFeed implementation

* Rename

* FeedTokenIterator

* GetChangeFeedStreamIterator

* Bucket extension

* Using bucket

* FeedTokenIterator of T

* Fixing bucket

* Change Feed for PK

* Emulator tests

* Feedtoken for PKRangeId

* Tests

* FromString

* GetPartitionKeyRanges

* Preview only

* Comments

* Bucket tests

* Comments

* Removing FeedTokenIterator

* Removing Bucket util

* undo file

* Refactor access

* Initial

* FeedTokenIteratorCore

* Tests

* Initial ReadFeed tests

* New CF test

* Retry on empty OK

* IsDone

* IsDone fix

* IsDone tests

* Accessors for INTERNAL

* More tests

* Fixes

* Refactoring for ReadFeed Iterator

* Possible PK

* Conditional behavior

* Simplifying

* FeedToken on non-partitioned resources

* Removing FeedTokenIteratorCore

* Adding more tests

* Of T

* diagnostics

* More tests

* merge with master

* Adding queryText

* Note

* Continuation update

* Adding test

* Using TryCatch

* Update Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs

Co-Authored-By: j82w <j82w@users.noreply.github.com>

* Comments

* Nullhandling

* Adding Gone passthrough and tests

* Removing duplicated code

* Adding more tests

* OverallScope

* Nullcheck

* Test

* comments

* Removing PKRangeGoneRetryHandler

* is pattern

* ReadFeed with PK test

* EPKString check

* More tests

* exception handling

* Diagnostics rename

* usings

* merge conflicts

* comments

Co-authored-by: j82w <j82w@users.noreply.github.com>
  • Loading branch information
ealsur and j82w committed Mar 13, 2020
1 parent e3e23a5 commit 1e6051d
Show file tree
Hide file tree
Showing 28 changed files with 1,672 additions and 237 deletions.
18 changes: 18 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientResources.Designer.cs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Microsoft.Azure.Cosmos/src/ClientResources.resx
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,10 @@
<data name="FeedToken_CannotParse" xml:space="preserve">
<value>Cannot parse '{0}' as a valid FeedToken.</value>
</data>
<data name="FeedToken_EffectivePartitionKeyRouting" xml:space="preserve">
<value>Cannot define EffectivePartitionKeyRouting and FeedToken simultaneously.</value>
</data>
<data name="FeedToken_InvalidImplementation" xml:space="preserve">
<value>Expected FeedTokenInternal instance.</value>
</data>
</root>
14 changes: 7 additions & 7 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -864,13 +864,13 @@ private FeedIteratorInternal GetDatabaseQueryStreamIteratorHelper(
QueryRequestOptions requestOptions = null)
{
this.ThrowIfDisposed();
return new FeedIteratorCore(
this.ClientContext,
this.DatabaseRootUri,
ResourceType.Database,
queryDefinition,
continuationToken,
requestOptions);
return FeedIteratorCore.CreateForNonPartitionedResource(
clientContext: this.ClientContext,
resourceLink: this.DatabaseRootUri,
resourceType: ResourceType.Database,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
options: requestOptions);
}

/// <summary>
Expand Down
155 changes: 137 additions & 18 deletions Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ namespace Microsoft.Azure.Cosmos
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Resource.CosmosExceptions;
using Microsoft.Azure.Documents;
using static Microsoft.Azure.Documents.RuntimeConstants;

Expand All @@ -20,25 +22,75 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class FeedIteratorCore : FeedIteratorInternal
{
private readonly ContainerCore containerCore;
private readonly CosmosClientContext clientContext;
private readonly Uri resourceLink;
private readonly ResourceType resourceType;
private readonly SqlQuerySpec querySpec;
private bool hasMoreResultsInternal;
private FeedTokenInternal feedTokenInternal;

public FeedIteratorCore(
internal static FeedIteratorCore CreateForNonPartitionedResource(
CosmosClientContext clientContext,
Uri resourceLink,
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
QueryRequestOptions options)
{
return new FeedIteratorCore(
clientContext: clientContext,
containerCore: null,
resourceLink: resourceLink,
resourceType: resourceType,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
feedTokenInternal: null,
options: options);
}

internal static FeedIteratorCore CreateForPartitionedResource(
ContainerCore containerCore,
Uri resourceLink,
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
FeedTokenInternal feedTokenInternal,
QueryRequestOptions options)
{
if (containerCore == null)
{
throw new ArgumentNullException(nameof(containerCore));
}

return new FeedIteratorCore(
containerCore: containerCore,
clientContext: containerCore.ClientContext,
resourceLink: resourceLink,
resourceType: resourceType,
queryDefinition: queryDefinition,
continuationToken: continuationToken,
feedTokenInternal: feedTokenInternal,
options: options);
}

private FeedIteratorCore(
ContainerCore containerCore,
CosmosClientContext clientContext,
Uri resourceLink,
ResourceType resourceType,
QueryDefinition queryDefinition,
string continuationToken,
FeedTokenInternal feedTokenInternal,
QueryRequestOptions options)
{
this.resourceLink = resourceLink;
this.containerCore = containerCore;
this.clientContext = clientContext;
this.resourceType = resourceType;
this.querySpec = queryDefinition?.ToSqlQuerySpec();
this.ContinuationToken = continuationToken;
this.feedTokenInternal = feedTokenInternal;
this.ContinuationToken = continuationToken ?? this.feedTokenInternal?.GetContinuation();
this.requestOptions = options;
this.hasMoreResultsInternal = true;
}
Expand All @@ -50,7 +102,7 @@ public override
#else
internal
#endif
FeedToken FeedToken => throw new NotImplementedException();
FeedToken FeedToken => this.feedTokenInternal;

/// <summary>
/// The query options for the result set
Expand All @@ -67,18 +119,48 @@ public override
/// </summary>
/// <param name="cancellationToken">(Optional) <see cref="CancellationToken"/> representing request cancellation.</param>
/// <returns>A query response from cosmos service</returns>
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
CosmosDiagnosticsContext diagnostics = CosmosDiagnosticsContext.Create(this.requestOptions);
using (diagnostics.CreateOverallScope("FeedReadNextAsync"))
{
return this.ReadNextInternalAsync(diagnostics, cancellationToken);
}
}

private async Task<ResponseMessage> ReadNextInternalAsync(
CosmosDiagnosticsContext diagnostics,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

Stream stream = null;
OperationType operation = OperationType.ReadFeed;
if (this.querySpec != null)
{
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType);
operation = OperationType.Query;
}

if (this.feedTokenInternal == null)
{
TryCatch<FeedTokenInternal> tryCatchFeedTokeninternal = await this.TryInitializeFeedTokenAsync(cancellationToken);
if (!tryCatchFeedTokeninternal.Succeeded)
{
if (tryCatchFeedTokeninternal.Exception.InnerException is CosmosException cosmosException)
{
return cosmosException.ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnostics));
}

return CosmosExceptionFactory.CreateInternalServerErrorException(
message: tryCatchFeedTokeninternal.Exception.InnerException.Message,
innerException: tryCatchFeedTokeninternal.Exception.InnerException,
diagnosticsContext: diagnostics).ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnostics));
}

this.feedTokenInternal = tryCatchFeedTokeninternal.Result;
}

ResponseMessage response = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.resourceLink,
resourceType: this.resourceType,
Expand All @@ -95,26 +177,63 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
request.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson);
request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString);
}
this.feedTokenInternal?.EnrichRequest(request);
},
diagnosticsScope: null,
diagnosticsScope: diagnostics,
cancellationToken: cancellationToken);

this.ContinuationToken = response.Headers.ContinuationToken;
this.hasMoreResultsInternal = GetHasMoreResults(this.ContinuationToken, response.StatusCode);
// Retry in case of splits or other scenarios only on partitioned resources
if (this.containerCore != null
&& await this.feedTokenInternal.ShouldRetryAsync(this.containerCore, response, cancellationToken))
{
return await this.ReadNextInternalAsync(diagnostics, cancellationToken);
}

if (response.IsSuccessStatusCode)
{
this.feedTokenInternal.UpdateContinuation(response.Headers.ContinuationToken);
this.ContinuationToken = this.feedTokenInternal.GetContinuation();
this.hasMoreResultsInternal = !this.feedTokenInternal.IsDone;
}
else
{
this.hasMoreResultsInternal = false;
}

return response;
}

internal static string GetContinuationToken(ResponseMessage httpResponseMessage)
private async Task<TryCatch<FeedTokenInternal>> TryInitializeFeedTokenAsync(CancellationToken cancellationToken)
{
return httpResponseMessage.Headers.ContinuationToken;
}
string containerRId = string.Empty;
if (this.containerCore != null)
{
try
{
containerRId = await this.containerCore.GetRIDAsync(cancellationToken);
}
catch (Exception cosmosException)
{
return TryCatch<FeedTokenInternal>.FromException(cosmosException);
}
}

internal static bool GetHasMoreResults(string continuationToken, HttpStatusCode statusCode)
{
// this logic might not be sufficient composite continuation token https://msdata.visualstudio.com/CosmosDB/SDK/_workitems/edit/269099
// in the case where this is a result set iterator for a change feed, not modified indicates that
// the enumeration is done for now.
return continuationToken != null && statusCode != HttpStatusCode.NotModified;
// Create FeedToken for the full Range
FeedTokenEPKRange feedTokenInternal = new FeedTokenEPKRange(
containerRId,
new PartitionKeyRange()
{
MinInclusive = Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
MaxExclusive = Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey
});
// Initialize with the ContinuationToken that the user passed, if any
if (this.ContinuationToken != null)
{
feedTokenInternal.UpdateContinuation(this.ContinuationToken);
}

return TryCatch<FeedTokenInternal>.FromResult(feedTokenInternal);
}

public override CosmosElement GetCosmsoElementContinuationToken()
Expand Down Expand Up @@ -164,4 +283,4 @@ public override async Task<FeedResponse<T>> ReadNextAsync(CancellationToken canc
return this.responseCreator(response);
}
}
}
}
42 changes: 35 additions & 7 deletions Microsoft.Azure.Cosmos/src/FeedTokenEPKRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ internal sealed class FeedTokenEPKRange : FeedTokenInternal
{
internal readonly Queue<CompositeContinuationToken> CompositeContinuationTokens;
internal readonly Documents.Routing.Range<string> CompleteRange;
private readonly HashSet<string> doneRanges;
private CompositeContinuationToken currentToken;
private string initialNotModifiedRange;
private string initialNoResultsRange;

private FeedTokenEPKRange(
string containerRid)
: base(containerRid)
{
this.CompositeContinuationTokens = new Queue<CompositeContinuationToken>();
this.doneRanges = new HashSet<string>();
}

private FeedTokenEPKRange(
Expand Down Expand Up @@ -122,8 +124,12 @@ public override void EnrichRequest(RequestMessage request)
throw new ArgumentNullException(nameof(request));
}

request.Properties[HandlerConstants.StartEpkString] = this.currentToken.Range.Min;
request.Properties[HandlerConstants.EndEpkString] = this.currentToken.Range.Max;
// in case EPK has already been set
if (!request.Properties.ContainsKey(HandlerConstants.StartEpkString))
{
request.Properties[HandlerConstants.StartEpkString] = this.currentToken.Range.Min;
request.Properties[HandlerConstants.EndEpkString] = this.currentToken.Range.Max;
}
}

public override string GetContinuation() => this.currentToken.Token;
Expand All @@ -135,33 +141,48 @@ public override string ToString()

public override void UpdateContinuation(string continuationToken)
{
if (continuationToken == null)
{
// Queries and normal ReadFeed can signal termination by CT null, not NotModified
// Change Feed never lands here, as it always provides a CT
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.doneRanges.Add(this.currentToken.Range.Min);
}

this.currentToken.Token = continuationToken;
this.MoveToNextToken();
}

/// <summary>
/// The concept of Done is only for Query and ReadFeed. Change Feed is never done, it is an infinite stream.
/// </summary>
public override bool IsDone => this.doneRanges.Count == this.CompositeContinuationTokens.Count;

public override async Task<bool> ShouldRetryAsync(
ContainerCore containerCore,
ResponseMessage responseMessage,
CancellationToken cancellationToken = default(CancellationToken))
{
if (responseMessage.IsSuccessStatusCode)
{
this.initialNotModifiedRange = null;
this.initialNoResultsRange = null;
return false;
}

// If the current response is NotModified (ChangeFeed), try and skip to a next one
if (responseMessage.StatusCode == HttpStatusCode.NotModified
&& this.CompositeContinuationTokens.Count > 1)
{
if (this.initialNotModifiedRange == null)
if (this.initialNoResultsRange == null)
{
this.initialNotModifiedRange = this.currentToken.Range.Min;
this.initialNoResultsRange = this.currentToken.Range.Min;
return true;
}

return !this.initialNotModifiedRange.Equals(this.currentToken.Range.Min, StringComparison.OrdinalIgnoreCase);
return !this.initialNoResultsRange.Equals(this.currentToken.Range.Min, StringComparison.OrdinalIgnoreCase);
}

// Split handling
bool partitionSplit = responseMessage.StatusCode == HttpStatusCode.Gone
&& (responseMessage.Headers.SubStatusCode == Documents.SubStatusCodes.PartitionKeyRangeGone || responseMessage.Headers.SubStatusCode == Documents.SubStatusCodes.CompletingSplit);
if (partitionSplit)
Expand Down Expand Up @@ -220,6 +241,13 @@ private void MoveToNextToken()
CompositeContinuationToken recentToken = this.CompositeContinuationTokens.Dequeue();
this.CompositeContinuationTokens.Enqueue(recentToken);
this.currentToken = this.CompositeContinuationTokens.Peek();

// In a Query / ReadFeed not Change Feed, skip ranges that are done to avoid requests
while (!this.IsDone &&
this.doneRanges.Contains(this.currentToken.Range.Min))
{
this.MoveToNextToken();
}
}

private void HandleSplit(IReadOnlyList<Documents.PartitionKeyRange> keyRanges)
Expand Down
Loading

0 comments on commit 1e6051d

Please sign in to comment.