Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeed : Adds ChangeFeedStartFrom to support StartTimes x FeedRanges #1725

Merged
merged 33 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0d924ee
Revert "Revert "ChangeFeedRequestOptions Refactor (#1332)" (#1684)"
bchong95 Jul 8, 2020
0398750
marking test as ignored as agreed upon
bchong95 Jul 9, 2020
0c8c604
Merge branch 'master' into revert-1684-users/jawilley/contract/change…
sboshra Jul 9, 2020
cc5b1a8
merged
bchong95 Jul 20, 2020
c6dbda7
Merge branch 'revert-1684-users/jawilley/contract/changefeed_revert' …
bchong95 Jul 20, 2020
4dba475
Merge branch 'master' into revert-1684-users/jawilley/contract/change…
bchong95 Jul 23, 2020
3d67d53
wired up changes
bchong95 Jul 23, 2020
f4e1be5
merged
bchong95 Jul 24, 2020
cb697d9
resolved iteration comments
bchong95 Jul 24, 2020
2bbbbd2
merged
bchong95 Jul 25, 2020
e3746f8
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
bchong95 Aug 6, 2020
990e8b0
merged
bchong95 Aug 6, 2020
72ecf96
made start from a required field
bchong95 Aug 6, 2020
e024d63
added feedback from API review
bchong95 Aug 7, 2020
9018e52
resolved iteration comments
bchong95 Aug 7, 2020
beb5650
fixed tests
bchong95 Aug 7, 2020
366d0d3
fixed feed range docs
bchong95 Aug 7, 2020
ab91449
removed unreachable code
bchong95 Aug 7, 2020
dd76679
fixed tests
bchong95 Aug 8, 2020
cdfdd1e
fixed tests
bchong95 Aug 10, 2020
223d91d
fixed mocks
bchong95 Aug 10, 2020
d098458
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 10, 2020
860f221
resolved iteration comments
bchong95 Aug 10, 2020
33a803e
Merge branch 'users/brchon/ChangeFeed/StartFromFeedRange' of https://…
bchong95 Aug 10, 2020
3b224b0
fixed build issue
bchong95 Aug 10, 2020
7c5e7ab
fixed test build errors
bchong95 Aug 10, 2020
4ae6f31
more build fixes
bchong95 Aug 10, 2020
a486d30
updated preview API json
bchong95 Aug 10, 2020
412c98e
merged
bchong95 Aug 10, 2020
dc93109
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 10, 2020
c3707f2
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
7b045bd
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
be3bf1f
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,12 +25,14 @@ internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal
private readonly CosmosClientContext clientContext;
private readonly ChangeFeedRequestOptions changeFeedOptions;
private readonly AsyncLazy<TryCatch<string>> lazyContainerRid;
private ChangeFeedStartFrom changeFeedStartFrom;
private bool hasMoreResults;

private FeedRangeContinuation FeedRangeContinuation;

public ChangeFeedIteratorCore(
ContainerInternal container,
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedRequestOptions changeFeedRequestOptions)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
Expand All @@ -43,24 +44,19 @@ public ChangeFeedIteratorCore(
});
this.hasMoreResults = true;

if (this.changeFeedOptions?.From is ChangeFeedRequestOptions.StartFromContinuation startFromContinuation)
this.changeFeedStartFrom = changeFeedStartFrom;
if (this.changeFeedStartFrom is ChangeFeedStartFromContinuation startFromContinuation)
{
if (!FeedRangeContinuation.TryParse(startFromContinuation.Continuation, out FeedRangeContinuation feedRangeContinuation))
{
throw new ArgumentException(string.Format(ClientResources.FeedToken_UnknownFormat, startFromContinuation.Continuation));
}

this.FeedRangeContinuation = feedRangeContinuation;
this.changeFeedOptions.FeedRange = feedRangeContinuation.GetFeedRange();
string continuationToken = feedRangeContinuation.GetContinuation();
if (continuationToken != null)
{
this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuationToken);
}
else
{
this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromBeginning();
}
FeedRange feedRange = feedRangeContinuation.GetFeedRange();
string etag = feedRangeContinuation.GetContinuation();

this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)feedRange);
}
}

Expand All @@ -76,7 +72,9 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
CosmosDiagnosticsContext diagnostics = CosmosDiagnosticsContext.Create(this.changeFeedOptions);
using (diagnostics.GetOverallScope())
{
diagnostics.AddDiagnosticsInternal(new FeedRangeStatistics(this.changeFeedOptions.FeedRange));
diagnostics.AddDiagnosticsInternal(
new FeedRangeStatistics(
this.changeFeedStartFrom.Accept(ChangeFeedRangeExtractor.Singleton)));
if (!this.lazyContainerRid.ValueInitialized)
{
using (diagnostics.CreateScope("InitializeContainerResourceId"))
Expand Down Expand Up @@ -130,33 +128,28 @@ private async Task<ResponseMessage> ReadNextInternalAsync(
{
cancellationToken.ThrowIfCancellationRequested();

string continuation = this.FeedRangeContinuation.GetContinuation();
if (continuation != null)
{
this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuation);
}

if ((this.changeFeedOptions.FeedRange == null) || this.changeFeedOptions.FeedRange is FeedRangeEpk)
{
// For now the backend does not support EPK Ranges if they don't line up with a PKRangeId
// So if the range the user supplied is a logical pk value, then we don't want to overwrite it.
this.changeFeedOptions.FeedRange = this.FeedRangeContinuation.GetFeedRange();
}

ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.container.LinkUri,
resourceType: ResourceType.Document,
operationType: OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
cosmosContainerCore: this.container,
requestEnricher: default,
requestEnricher: (request) =>
{
ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(request);
this.changeFeedStartFrom.Accept(visitor);
},
partitionKey: default,
streamPayload: default,
diagnosticsContext: diagnosticsScope,
cancellationToken: cancellationToken);

if (await this.ShouldRetryAsync(responseMessage, cancellationToken))
{
string etag = this.FeedRangeContinuation.GetContinuation();
FeedRange feedRange = this.FeedRangeContinuation.GetFeedRange();
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)feedRange);

return await this.ReadNextInternalAsync(diagnosticsScope, cancellationToken);
}

Expand All @@ -166,6 +159,11 @@ private async Task<ResponseMessage> ReadNextInternalAsync(
// Change Feed read uses Etag for continuation
this.hasMoreResults = responseMessage.IsSuccessStatusCode;
this.FeedRangeContinuation.ReplaceContinuation(responseMessage.Headers.ETag);

string etag = this.FeedRangeContinuation.GetContinuation();
FeedRange feedRange = this.FeedRangeContinuation.GetFeedRange();
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)feedRange);

return FeedRangeResponse.CreateSuccess(
responseMessage,
this.FeedRangeContinuation);
Expand Down Expand Up @@ -216,13 +214,14 @@ private async Task InitializeFeedContinuationAsync(CancellationToken cancellatio
{
FeedRangePartitionKeyRangeExtractor feedRangePartitionKeyRangeExtractor = new FeedRangePartitionKeyRangeExtractor(this.container);

IReadOnlyList<Documents.Routing.Range<string>> ranges = await ((FeedRangeInternal)this.changeFeedOptions.FeedRange).AcceptAsync(
FeedRange feedRange = this.changeFeedStartFrom.Accept(ChangeFeedRangeExtractor.Singleton);
IReadOnlyList<Documents.Routing.Range<string>> ranges = await ((FeedRangeInternal)feedRange).AcceptAsync(
feedRangePartitionKeyRangeExtractor,
cancellationToken);

this.FeedRangeContinuation = new FeedRangeCompositeContinuation(
containerRid: this.lazyContainerRid.Result.Result,
feedRange: (FeedRangeInternal)this.changeFeedOptions.FeedRange,
feedRange: (FeedRangeInternal)feedRange,
ranges: ranges);
}
else if (this.FeedRangeContinuation?.FeedRange is FeedRangePartitionKeyRange feedRangePartitionKeyRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;
using System.Net;
Expand All @@ -18,16 +18,18 @@ internal sealed class ChangeFeedPartitionKeyResultSetIteratorCore : FeedIterator
private readonly CosmosClientContext clientContext;
private readonly ContainerInternal container;
private readonly ChangeFeedRequestOptions changeFeedOptions;
private ChangeFeedStartFrom changeFeedStartFrom;
private bool hasMoreResultsInternal;
private string continuationToken;

public ChangeFeedPartitionKeyResultSetIteratorCore(
CosmosClientContext clientContext,
ContainerInternal container,
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedRequestOptions options)
{
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.changeFeedStartFrom = changeFeedStartFrom;
this.changeFeedOptions = options;
}

Expand All @@ -42,27 +44,29 @@ public ChangeFeedPartitionKeyResultSetIteratorCore(
/// <returns>A change feed response from cosmos service</returns>
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default(CancellationToken))
{
// Change Feed read uses Etag for continuation
if (this.continuationToken != null)
{
this.changeFeedOptions.From = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(this.continuationToken);
}

ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
cosmosContainerCore: this.container,
resourceUri: this.container.LinkUri,
resourceType: Documents.ResourceType.Document,
operationType: Documents.OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
requestEnricher: default,
requestEnricher: (requestMessage) =>
{
ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(requestMessage);
this.changeFeedStartFrom.Accept(visitor);
},
partitionKey: default,
streamPayload: default,
diagnosticsContext: default,
cancellationToken: cancellationToken);

this.continuationToken = responseMessage.Headers.ETag;
// Change Feed uses etag as continuation token.
string etag = responseMessage.Headers.ETag;
this.hasMoreResultsInternal = responseMessage.IsSuccessStatusCode;
responseMessage.Headers.ContinuationToken = this.continuationToken;
responseMessage.Headers.ContinuationToken = etag;

FeedRangeInternal feedRange = (FeedRangeInternal)this.changeFeedStartFrom.Accept(ChangeFeedRangeExtractor.Singleton);
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, feedRange);

return responseMessage;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;

internal sealed class ChangeFeedRangeExtractor : ChangeFeedStartFromVisitor<FeedRange>
{
public static readonly ChangeFeedRangeExtractor Singleton = new ChangeFeedRangeExtractor();

private ChangeFeedRangeExtractor()
{
}

public override FeedRange Visit(ChangeFeedStartFromNow startFromNow) => startFromNow.FeedRange;

public override FeedRange Visit(ChangeFeedStartFromTime startFromTime) => startFromTime.FeedRange;

public override FeedRange Visit(ChangeFeedStartFromContinuation startFromContinuation)
=> throw new NotSupportedException($"{nameof(ChangeFeedStartFromContinuation)} does not have a feed range.");

public override FeedRange Visit(ChangeFeedStartFromBeginning startFromBeginning) => startFromBeginning.FeedRange;

public override FeedRange Visit(ChangeFeedStartFromContinuationAndFeedRange startFromContinuationAndFeedRange) => startFromContinuationAndFeedRange.FeedRange;
}
}
105 changes: 105 additions & 0 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedStartFrom.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using Microsoft.Azure.Cosmos.ChangeFeed;

/// <summary>
/// Base class for where to start a ChangeFeed operation in <see cref="ChangeFeedRequestOptions"/>.
/// </summary>
/// <remarks>Use one of the static constructors to generate a StartFrom option.</remarks>
#if PREVIEW
public
#else
internal
#endif
abstract class ChangeFeedStartFrom
{
/// <summary>
/// Initializes an instance of the <see cref="ChangeFeedStartFrom"/> class.
/// </summary>
internal ChangeFeedStartFrom()
{
// Internal so people can't derive from this type.
}

internal abstract void Accept(ChangeFeedStartFromVisitor visitor);

internal abstract TResult Accept<TResult>(ChangeFeedStartFromVisitor<TResult> visitor);

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.
/// </summary>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.</returns>
public static ChangeFeedStartFrom Now() => Now(FeedRangeEpk.FullRange);

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.
/// </summary>
/// <param name="feedRange">The range to start from.</param>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from this moment onward.</returns>
public static ChangeFeedStartFrom Now(FeedRange feedRange)
{
if (!(feedRange is FeedRangeInternal feedRangeInternal))
{
throw new ArgumentException($"{nameof(feedRange)} needs to be a {nameof(FeedRangeInternal)}.");
}

return new ChangeFeedStartFromNow(feedRangeInternal);
}

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.
/// </summary>
/// <param name="dateTimeUtc">The time (in UTC) to start reading from.</param>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.</returns>
public static ChangeFeedStartFrom Time(DateTime dateTimeUtc) => Time(dateTimeUtc, FeedRangeEpk.FullRange);

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.
/// </summary>
/// <param name="dateTimeUtc">The time to start reading from.</param>
/// <param name="feedRange">The range to start from.</param>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from some point in time onward.</returns>
public static ChangeFeedStartFrom Time(DateTime dateTimeUtc, FeedRange feedRange)
{
if (!(feedRange is FeedRangeInternal feedRangeInternal))
{
throw new ArgumentException($"{nameof(feedRange)} needs to be a {nameof(FeedRangeInternal)}.");
}

return new ChangeFeedStartFromTime(dateTimeUtc, feedRangeInternal);
}

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from a save point.
/// </summary>
/// <param name="continuationToken">The continuation to resume from.</param>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from a save point.</returns>
public static ChangeFeedStartFrom ContinuationToken(string continuationToken) => new ChangeFeedStartFromContinuation(continuationToken);

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start from the beginning of time.
/// </summary>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from the beginning of time.</returns>
public static ChangeFeedStartFrom Beginning() => Beginning(FeedRangeEpk.FullRange);

/// <summary>
/// Creates a <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start from the beginning of time.
/// </summary>
/// <param name="feedRange">The range to start from.</param>
/// <returns>A <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from the beginning of time.</returns>
public static ChangeFeedStartFrom Beginning(FeedRange feedRange)
{
if (!(feedRange is FeedRangeInternal feedRangeInternal))
{
throw new ArgumentException($"{nameof(feedRange)} needs to be a {nameof(FeedRangeInternal)}.");
}

return new ChangeFeedStartFromBeginning(feedRangeInternal);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.ChangeFeed
{
using System;

/// <summary>
/// Derived instance of <see cref="ChangeFeedStartFrom"/> that tells the ChangeFeed operation to start reading changes from the beginning of time.
/// </summary>
internal sealed class ChangeFeedStartFromBeginning : ChangeFeedStartFrom
{
/// <summary>
/// Initializes an instance of the <see cref="ChangeFeedStartFromBeginning"/> class.
/// </summary>
/// <param name="feedRange">The (optional) range to start from.</param>
public ChangeFeedStartFromBeginning(FeedRangeInternal feedRange)
: base()
{
this.FeedRange = feedRange ?? throw new ArgumentNullException(nameof(feedRange));
}

/// <summary>
/// Gets the (optional) range to start from.
/// </summary>
public FeedRangeInternal FeedRange { get; }

internal override void Accept(ChangeFeedStartFromVisitor visitor) => visitor.Visit(this);

internal override TResult Accept<TResult>(ChangeFeedStartFromVisitor<TResult> visitor) => visitor.Visit(this);
}
}
Loading