Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChangeFeed : Adds ChangeFeedStartFrom to support StartTimes x FeedRanges #1725

Merged
merged 33 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0d924ee
Revert "Revert "ChangeFeedRequestOptions Refactor (#1332)" (#1684)"
bchong95 Jul 8, 2020
0398750
marking test as ignored as agreed upon
bchong95 Jul 9, 2020
0c8c604
Merge branch 'master' into revert-1684-users/jawilley/contract/change…
sboshra Jul 9, 2020
cc5b1a8
merged
bchong95 Jul 20, 2020
c6dbda7
Merge branch 'revert-1684-users/jawilley/contract/changefeed_revert' …
bchong95 Jul 20, 2020
4dba475
Merge branch 'master' into revert-1684-users/jawilley/contract/change…
bchong95 Jul 23, 2020
3d67d53
wired up changes
bchong95 Jul 23, 2020
f4e1be5
merged
bchong95 Jul 24, 2020
cb697d9
resolved iteration comments
bchong95 Jul 24, 2020
2bbbbd2
merged
bchong95 Jul 25, 2020
e3746f8
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
bchong95 Aug 6, 2020
990e8b0
merged
bchong95 Aug 6, 2020
72ecf96
made start from a required field
bchong95 Aug 6, 2020
e024d63
added feedback from API review
bchong95 Aug 7, 2020
9018e52
resolved iteration comments
bchong95 Aug 7, 2020
beb5650
fixed tests
bchong95 Aug 7, 2020
366d0d3
fixed feed range docs
bchong95 Aug 7, 2020
ab91449
removed unreachable code
bchong95 Aug 7, 2020
dd76679
fixed tests
bchong95 Aug 8, 2020
cdfdd1e
fixed tests
bchong95 Aug 10, 2020
223d91d
fixed mocks
bchong95 Aug 10, 2020
d098458
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 10, 2020
860f221
resolved iteration comments
bchong95 Aug 10, 2020
33a803e
Merge branch 'users/brchon/ChangeFeed/StartFromFeedRange' of https://…
bchong95 Aug 10, 2020
3b224b0
fixed build issue
bchong95 Aug 10, 2020
7c5e7ab
fixed test build errors
bchong95 Aug 10, 2020
4ae6f31
more build fixes
bchong95 Aug 10, 2020
a486d30
updated preview API json
bchong95 Aug 10, 2020
412c98e
merged
bchong95 Aug 10, 2020
dc93109
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 10, 2020
c3707f2
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
7b045bd
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
be3bf1f
Merge branch 'master' into users/brchon/ChangeFeed/StartFromFeedRange
j82w Aug 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,18 @@ public PartitionSupervisorFactoryCore(
FeedProcessorFactory<T> partitionProcessorFactory,
ChangeFeedLeaseOptions options)
{
if (observerFactory == null) throw new ArgumentNullException(nameof(observerFactory));
if (leaseManager == null) throw new ArgumentNullException(nameof(leaseManager));
if (options == null) throw new ArgumentNullException(nameof(options));
if (partitionProcessorFactory == null) throw new ArgumentNullException(nameof(partitionProcessorFactory));

this.observerFactory = observerFactory;
this.leaseManager = leaseManager;
this.changeFeedLeaseOptions = options;
this.partitionProcessorFactory = partitionProcessorFactory;
this.observerFactory = observerFactory ?? throw new ArgumentNullException(nameof(observerFactory));
this.leaseManager = leaseManager ?? throw new ArgumentNullException(nameof(leaseManager));
this.changeFeedLeaseOptions = options ?? throw new ArgumentNullException(nameof(options));
this.partitionProcessorFactory = partitionProcessorFactory ?? throw new ArgumentNullException(nameof(partitionProcessorFactory));
}

public override PartitionSupervisor Create(DocumentServiceLease lease)
{
if (lease == null)
{
throw new ArgumentNullException(nameof(lease));
}

ChangeFeedObserver<T> changeFeedObserver = this.observerFactory.CreateObserver();
FeedProcessor processor = this.partitionProcessorFactory.Create(lease, changeFeedObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,35 @@ public static ChangeFeedPartitionKeyResultSetIteratorCore BuildResultSetIterator
DateTime? startTime,
bool startFromBeginning)
{
ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions();
if (startTime.HasValue)
FeedRange feedRange = new FeedRangePartitionKeyRange(partitionKeyRangeId);

ChangeFeedRequestOptions.StartFrom startFrom;
if (continuationToken != null)
{
// For continuation based feed range we need to manufactor a new continuation token that has the partition key range id in it.
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromContinuation(continuationToken);
throw new NotImplementedException("Need to implement after I see what the continuation token looks like.");
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}
else if (startTime.HasValue)
{
requestOptions.StartTime = startTime.Value;
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromTimeWithRange(startTime.Value, feedRange);
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}
else if (startFromBeginning)
{
requestOptions.StartTime = ChangeFeedRequestOptions.DateTimeStartFromBeginning;
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromBeginningWithRange(feedRange);
}
else
{
startFrom = ChangeFeedRequestOptions.StartFrom.CreateFromNowWithRange(feedRange);
}

ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions()
{
MaxItemCount = maxItemCount,
From = startFrom,
};

return new ChangeFeedPartitionKeyResultSetIteratorCore(
partitionKeyRangeId: partitionKeyRangeId,
continuationToken: continuationToken,
maxItemCount: maxItemCount,
clientContext: container.ClientContext,
container: container,
options: requestOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ private FeedRangeCompositeContinuation(
this.CompositeContinuationTokens = new Queue<CompositeContinuationToken>();
}

public override void Accept(
FeedRangeVisitor visitor,
Action<RequestMessage, string> fillContinuation)
{
visitor.Visit(this, fillContinuation);
}

public FeedRangeCompositeContinuation(
string containerRid,
FeedRangeInternal feedRange,
Expand Down Expand Up @@ -100,6 +93,21 @@ public FeedRangeCompositeContinuation(

public override string GetContinuation() => this.CurrentToken?.Token;

public override FeedRange GetFeedRange()
{
if (this.FeedRange is FeedRangePartitionKeyRange)
{
return this.FeedRange;
}

if (this.CurrentToken != null)
{
return new FeedRangeEPK(this.CurrentToken.Range);
}

return null;
}

public override string ToString()
{
return JsonConvert.SerializeObject(this);
Expand Down Expand Up @@ -228,7 +236,7 @@ private static bool TryParseAsCompositeContinuationToken(
{
return false;
}
}
}

private static CompositeContinuationToken CreateCompositeContinuationTokenForRange(
string minInclusive,
Expand All @@ -252,7 +260,7 @@ private void MoveToNextToken()
// Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done
this.CompositeContinuationTokens.Enqueue(recentToken);
}

this.CurrentToken = this.CompositeContinuationTokens.Count > 0 ? this.CompositeContinuationTokens.Peek() : null;
}

Expand Down Expand Up @@ -316,5 +324,7 @@ private void CreateChildRanges(IReadOnlyList<Documents.PartitionKeyRange> keyRan

return keyRanges;
}

public override void Accept(IFeedRangeContinuationVisitor visitor) => visitor.Visit(this);
}
}
2 changes: 2 additions & 0 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ public static FeedRange FromJsonString(string toStringValue)

return parsedRange;
}

public static FeedRange CreateFromPartitionKey(PartitionKey partitionKey) => new FeedRangePartitionKey(partitionKey);
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}
}
8 changes: 4 additions & 4 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRangeContinuation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ public FeedRangeContinuation(
this.ContainerRid = containerRid;
}

public abstract void Accept(
FeedRangeVisitor visitor,
Action<RequestMessage, string> fillContinuation);

public abstract string GetContinuation();

public abstract FeedRange GetFeedRange();

public abstract void ReplaceContinuation(string continuationToken);

public abstract bool IsDone { get; }
Expand All @@ -64,5 +62,7 @@ public static bool TryParse(
ContainerInternal containerCore,
ResponseMessage responseMessage,
CancellationToken cancellationToken);

public abstract void Accept(IFeedRangeContinuationVisitor visitor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;

/// <summary>
/// Visitor to populate RequestMessage headers and properties based on FeedRange.
/// </summary>
internal sealed class FeedRangeContinuationRequestMessagePopulatorVisitor : IFeedRangeContinuationVisitor
{
private readonly RequestMessage request;
private readonly Action<RequestMessage, string> fillContinuation;

public FeedRangeContinuationRequestMessagePopulatorVisitor(RequestMessage request, Action<RequestMessage, string> fillContinuation)
{
this.request = request ?? throw new ArgumentNullException(nameof(request));
this.fillContinuation = fillContinuation ?? throw new ArgumentNullException(nameof(fillContinuation));
}

public void Visit(FeedRangeCompositeContinuation feedRangeCompositeContinuation)
{
// In case EPK has already been set by compute
if (!this.request.Properties.ContainsKey(HandlerConstants.StartEpkString))
{
this.request.Properties[HandlerConstants.StartEpkString] = feedRangeCompositeContinuation.CurrentToken.Range.Min;
this.request.Properties[HandlerConstants.EndEpkString] = feedRangeCompositeContinuation.CurrentToken.Range.Max;
}

this.fillContinuation(this.request, feedRangeCompositeContinuation.GetContinuation());
}
}
}
37 changes: 19 additions & 18 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEPK.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
Expand All @@ -15,29 +16,28 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class FeedRangeEPK : FeedRangeInternal
{
public Documents.Routing.Range<string> Range { get; }

public static FeedRangeEPK ForFullRange()
{
return new FeedRangeEPK(new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false));
}
public static readonly FeedRangeEPK FullRange = new FeedRangeEPK(new Documents.Routing.Range<string>(
Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey,
Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey,
isMinInclusive: true,
isMaxInclusive: false));

public FeedRangeEPK(Documents.Routing.Range<string> range)
{
if (range == null)
{
throw new ArgumentNullException(nameof(range));
}

this.Range = range;
}

public Documents.Routing.Range<string> Range { get; }

public override Task<List<Documents.Routing.Range<string>>> GetEffectiveRangesAsync(
IRoutingMapProvider routingMapProvider,
string containerRid,
Documents.PartitionKeyDefinition partitionKeyDefinition)
{
return Task.FromResult(new List<Documents.Routing.Range<string>>() { this.Range });
}
Documents.PartitionKeyDefinition partitionKeyDefinition) => Task.FromResult(new List<Documents.Routing.Range<string>>() { this.Range });

public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
IRoutingMapProvider routingMapProvider,
Expand All @@ -49,10 +49,11 @@ public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
return partitionKeyRanges.Select(partitionKeyRange => partitionKeyRange.Id);
}

public override void Accept(FeedRangeVisitor visitor)
{
visitor.Visit(this);
}
public override void Accept(IFeedRangeVisitor visitor) => visitor.Visit(this);

public override Task<TResult> AcceptAsync<TResult>(
IFeedRangeAsyncVisitor<TResult> visitor,
CancellationToken cancellationToken = default) => visitor.VisitAsync(this, cancellationToken);

public override string ToString() => this.Range.ToString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ public abstract Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
Documents.PartitionKeyDefinition partitionKeyDefinition,
CancellationToken cancellationToken);

public abstract void Accept(FeedRangeVisitor visitor);
public abstract void Accept(IFeedRangeVisitor visitor);

public abstract Task<TResult> AcceptAsync<TResult>(IFeedRangeAsyncVisitor<TResult> visitor, CancellationToken cancellationToken = default);

public abstract override string ToString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ public FeedRangePartitionKey(PartitionKey partitionKey)
public override Task<List<Documents.Routing.Range<string>>> GetEffectiveRangesAsync(
IRoutingMapProvider routingMapProvider,
string containerRid,
Documents.PartitionKeyDefinition partitionKeyDefinition)
{
return Task.FromResult(new List<Documents.Routing.Range<string>>
Documents.PartitionKeyDefinition partitionKeyDefinition) => Task.FromResult(
new List<Documents.Routing.Range<string>>
{
Documents.Routing.Range<string>.GetPointRange(
this.PartitionKey.InternalKey.GetEffectivePartitionKeyString(partitionKeyDefinition))
});
}

public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
IRoutingMapProvider routingMapProvider,
Expand All @@ -44,10 +42,11 @@ public override async Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
return new List<string>() { range.Id };
}

public override void Accept(FeedRangeVisitor visitor)
{
visitor.Visit(this);
}
public override void Accept(IFeedRangeVisitor visitor) => visitor.Visit(this);

public override Task<TResult> AcceptAsync<TResult>(
IFeedRangeAsyncVisitor<TResult> visitor,
CancellationToken cancellationToken = default) => visitor.VisitAsync(this, cancellationToken);

public override string ToString() => this.PartitionKey.InternalKey.ToJsonString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ namespace Microsoft.Azure.Cosmos
/// </summary>
internal sealed class FeedRangePartitionKeyRange : FeedRangeInternal
{
public string PartitionKeyRangeId { get; }

public FeedRangePartitionKeyRange(string partitionKeyRangeId)
{
this.PartitionKeyRangeId = partitionKeyRangeId;
}

public string PartitionKeyRangeId { get; }

public override async Task<List<Documents.Routing.Range<string>>> GetEffectiveRangesAsync(
IRoutingMapProvider routingMapProvider,
string containerRid,
Expand Down Expand Up @@ -74,10 +74,11 @@ public override Task<IEnumerable<string>> GetPartitionKeyRangesAsync(
return Task.FromResult(partitionKeyRanges);
}

public override void Accept(FeedRangeVisitor visitor)
{
visitor.Visit(this);
}
public override void Accept(IFeedRangeVisitor visitor) => visitor.Visit(this);

public override Task<TResult> AcceptAsync<TResult>(
IFeedRangeAsyncVisitor<TResult> visitor,
CancellationToken cancellationToken = default) => visitor.VisitAsync(this, cancellationToken);

public override string ToString() => this.PartitionKeyRangeId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;

internal sealed class FeedRangePartitionKeyRangeExtractor : IFeedRangeAsyncVisitor<IReadOnlyList<Documents.Routing.Range<string>>>
{
private readonly ContainerInternal container;

public FeedRangePartitionKeyRangeExtractor(ContainerInternal container)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
}

public async Task<IReadOnlyList<Documents.Routing.Range<string>>> VisitAsync(FeedRangePartitionKey feedRange, CancellationToken cancellationToken = default)
{
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
PartitionKeyDefinition partitionKeyDefinition = await this.container.GetPartitionKeyDefinitionAsync(cancellationToken);
return await feedRange.GetEffectiveRangesAsync(
partitionKeyRangeCache,
await this.container.GetRIDAsync(cancellationToken),
partitionKeyDefinition);
}

public async Task<IReadOnlyList<Documents.Routing.Range<string>>> VisitAsync(FeedRangePartitionKeyRange feedRange, CancellationToken cancellationToken = default)
{
// Migration from PKRangeId scenario
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
return await feedRange.GetEffectiveRangesAsync(
routingMapProvider: partitionKeyRangeCache,
containerRid: await this.container.GetRIDAsync(cancellationToken),
partitionKeyDefinition: null);
}

public async Task<IReadOnlyList<Documents.Routing.Range<string>>> VisitAsync(FeedRangeEPK feedRange, CancellationToken cancellationToken = default)
{
Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.container.ClientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
IReadOnlyList<PartitionKeyRange> pkRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync(
collectionRid: await this.container.GetRIDAsync(cancellationToken),
range: feedRange.Range,
forceRefresh: false);
return pkRanges.Select(pkRange => pkRange.ToRange()).ToList();
}
}
}
Loading