Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Adds support for EPK leases #2013

Merged
merged 48 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
5bdd578
Rename
ealsur Nov 2, 2020
a10d15b
refactor
ealsur Nov 2, 2020
6b4312c
initial if
ealsur Nov 2, 2020
bb5012a
versioning the lease schema
ealsur Nov 3, 2020
8274686
serializable
ealsur Nov 3, 2020
8767143
serialization tests
ealsur Nov 3, 2020
e300aa8
Refactor synchronizer
ealsur Nov 4, 2020
2bab5bf
no need for this Task
ealsur Nov 4, 2020
6821fc3
Wiring iterator
ealsur Nov 4, 2020
a1dd2f8
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 4, 2020
51a02f8
Wiring split/merge detection
ealsur Nov 4, 2020
e0a313f
fixing tests
ealsur Nov 4, 2020
966728f
usings
ealsur Nov 4, 2020
8fbf562
wiring lease upgrade
ealsur Nov 4, 2020
de3485f
new tests
ealsur Nov 4, 2020
3ddb7e3
more tests
ealsur Nov 4, 2020
bb371d9
custom routing
ealsur Nov 5, 2020
81d53da
mockable
ealsur Nov 6, 2020
14cd037
adding more tests
ealsur Nov 6, 2020
39baece
more tests
ealsur Nov 6, 2020
dad768f
More tests
ealsur Nov 6, 2020
5d6819e
tests
ealsur Nov 6, 2020
3d2ec91
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 13, 2020
f8636b2
Refactoring and more tests
ealsur Nov 17, 2020
e2ae264
Refactor for clarity
ealsur Nov 17, 2020
4f9a996
Adding notes
ealsur Nov 17, 2020
c751b00
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 17, 2020
5b2350f
More tests
ealsur Nov 18, 2020
a195c82
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 19, 2020
4a8daf5
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 23, 2020
70ce7c4
merge with master fixes
ealsur Nov 23, 2020
9a90273
merge with master
ealsur Dec 2, 2020
d2134f8
new interfaces and merge conflicts
ealsur Dec 2, 2020
bc670ea
Merge branch 'master' into users/ealsur/mergecfp
j82w Dec 2, 2020
98f3e38
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 7, 2020
0c45e1f
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
67b7677
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
8581d86
Update ChangeFeedPartitionKeyResultSetIteratorCore.cs
ealsur Dec 7, 2020
2f3158e
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
4e341bc
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
6c4f2b8
Update ChangeFeedPartitionKeyResultSetIteratorCore.cs
ealsur Dec 7, 2020
b24763f
fixing filtering
ealsur Dec 9, 2020
ed0e1f5
using requestinvokerhandler
ealsur Dec 9, 2020
c3cc446
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 9, 2020
6caba43
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 12, 2020
7f398ae
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 14, 2020
0e9a697
Adding a comment
ealsur Dec 14, 2020
0c3a06e
Removing extra converter
ealsur Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,88 @@
namespace Microsoft.Azure.Cosmos.ChangeFeed
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Documents;

/// <summary>
/// Cosmos Change Feed Iterator for a particular Partition Key Range
/// </summary>
internal sealed class ChangeFeedPartitionKeyResultSetIteratorCore : FeedIteratorInternal
{
public static ChangeFeedPartitionKeyResultSetIteratorCore Create(
DocumentServiceLease lease,
string continuationToken,
int? maxItemCount,
ContainerInternal container,
DateTime? startTime,
bool startFromBeginning,
Routing.PartitionKeyRangeCache partitionKeyRangeCache = null)
{
// If the lease represents a full partition (old schema) then use a FeedRangePartitionKeyRange
// If the lease represents an EPK range (new schema) the use the FeedRange in the lease
FeedRangeInternal feedRange = lease is DocumentServiceLeaseCoreEpk ? lease.FeedRange : new FeedRangePartitionKeyRange(lease.CurrentLeaseToken);

ChangeFeedStartFrom startFrom;
if (continuationToken != null)
{
// For continuation based feed range we need to manufactor a new continuation token that has the partition key range id in it.
startFrom = new ChangeFeedStartFromContinuationAndFeedRange(continuationToken, feedRange);
}
else if (startTime.HasValue)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
startFrom = ChangeFeedStartFrom.Time(startTime.Value, feedRange);
}
else if (startFromBeginning)
{
startFrom = ChangeFeedStartFrom.Beginning(feedRange);
}
else
{
startFrom = ChangeFeedStartFrom.Now(feedRange);
}

ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions()
{
PageSizeHint = maxItemCount,
};

return new ChangeFeedPartitionKeyResultSetIteratorCore(
container: container,
changeFeedStartFrom: startFrom,
feedRangeInternal: feedRange,
options: requestOptions,
partitionKeyRangeCache: partitionKeyRangeCache);
}

private readonly CosmosClientContext clientContext;
private readonly ContainerInternal container;
private readonly ChangeFeedRequestOptions changeFeedOptions;
private readonly FeedRangeInternal feedRangeInternal;
private readonly AsyncLazy<TryCatch<string>> lazyContainerRid;
private ChangeFeedStartFrom changeFeedStartFrom;
private bool hasMoreResultsInternal;
private Routing.PartitionKeyRangeCache partitionKeyRangeCache;

public ChangeFeedPartitionKeyResultSetIteratorCore(
CosmosClientContext clientContext,
private ChangeFeedPartitionKeyResultSetIteratorCore(
FeedRangeInternal feedRangeInternal,
ealsur marked this conversation as resolved.
Show resolved Hide resolved
ContainerInternal container,
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedRequestOptions options)
ChangeFeedRequestOptions options,
Routing.PartitionKeyRangeCache partitionKeyRangeCache)
{
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.changeFeedStartFrom = changeFeedStartFrom;
this.changeFeedStartFrom = changeFeedStartFrom ?? throw new ArgumentNullException(nameof(changeFeedStartFrom));
this.feedRangeInternal = feedRangeInternal ?? throw new ArgumentNullException(nameof(feedRangeInternal));
this.partitionKeyRangeCache = partitionKeyRangeCache;
this.clientContext = this.container.ClientContext;
this.changeFeedOptions = options;
this.lazyContainerRid = new AsyncLazy<TryCatch<string>>(valueFactory: (innerCancellationToken) => this.TryInitializeContainerRIdAsync(innerCancellationToken));
}

public override bool HasMoreResults => this.hasMoreResultsInternal;
Expand All @@ -46,16 +103,57 @@ public override CosmosElement GetCosmosElementContinuationToken()
/// <returns>A change feed response from cosmos service</returns>
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
PartitionKeyRange targetPartitionKeyRange = null;
Documents.Routing.Range<string> targetEPKRange = null;

if (this.ShouldApplyEPKFiltering(out FeedRangeEpk feedRangeEpk))
{
if (!this.lazyContainerRid.ValueInitialized)
{
TryCatch<string> tryInitializeContainerRId = await this.lazyContainerRid.GetValueAsync(cancellationToken);
if (!tryInitializeContainerRId.Succeeded)
{
CosmosException cosmosException = tryInitializeContainerRId.Exception.InnerException as CosmosException;
return cosmosException.ToCosmosResponseMessage(new RequestMessage());
}

this.partitionKeyRangeCache = await this.clientContext.DocumentClient.GetPartitionKeyRangeCacheAsync();
}

// Verify if an EPK based lease now targets more than 1 partition, could happen because of a split
IReadOnlyList<PartitionKeyRange> overlappingRanges = await this.partitionKeyRangeCache.TryGetOverlappingRangesAsync(
collectionRid: this.lazyContainerRid.Result.Result,
range: feedRangeEpk.Range);

// Force lease to be handled for split
if ((overlappingRanges == null) || (overlappingRanges.Count != 1))
{
ResponseMessage goneResponse = new ResponseMessage(System.Net.HttpStatusCode.Gone);
goneResponse.Headers.SubStatusCode = SubStatusCodes.PartitionKeyRangeGone;
return goneResponse;
}

targetPartitionKeyRange = overlappingRanges[0];
targetEPKRange = feedRangeEpk.Range;
}

ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
cosmosContainerCore: this.container,
resourceUri: this.container.LinkUri,
resourceType: Documents.ResourceType.Document,
operationType: Documents.OperationType.ReadFeed,
resourceType: ResourceType.Document,
operationType: OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
requestEnricher: (requestMessage) =>
{
// Set time headers if any
ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(requestMessage);
this.changeFeedStartFrom.Accept(visitor);

// Explicitly set the EPK filtering if needed
ChangeFeedPartitionKeyResultSetIteratorCore.SetEffectivePartitionKeyFiltersIfNeeded(
requestMessage,
targetPartitionKeyRange,
targetEPKRange);
},
partitionKey: default,
streamPayload: default,
Expand All @@ -66,18 +164,52 @@ public override async Task<ResponseMessage> ReadNextAsync(CancellationToken canc
string etag = responseMessage.Headers.ETag;
this.hasMoreResultsInternal = responseMessage.IsSuccessStatusCode;
responseMessage.Headers.ContinuationToken = etag;
FeedRangeInternal feedRange = this.changeFeedStartFrom switch
{
ChangeFeedStartFromNow now => now.FeedRange,
ChangeFeedStartFromTime time => time.FeedRange,
ChangeFeedStartFromContinuation continuation => throw new NotSupportedException(),
ChangeFeedStartFromBeginning beginning => beginning.FeedRange,
ChangeFeedStartFromContinuationAndFeedRange continuationAndFeedRange => continuationAndFeedRange.FeedRange,
_ => throw new InvalidOperationException(),
};
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, feedRange);
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, this.feedRangeInternal);

return responseMessage;
}

private static void SetEffectivePartitionKeyFiltersIfNeeded(
RequestMessage requestMessage,
PartitionKeyRange targetPartitionKeyRange,
Documents.Routing.Range<string> targetRange)
{
if (targetPartitionKeyRange == null
|| targetRange == null)
{
// No specific routing was set
return;
}

requestMessage.PartitionKeyRangeId = new PartitionKeyRangeIdentity(targetPartitionKeyRange.Id);
requestMessage.Headers[HttpConstants.HttpHeaders.ReadFeedKeyType] = RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString();
requestMessage.Headers[HttpConstants.HttpHeaders.StartEpk] = targetRange.Min;
requestMessage.Headers[HttpConstants.HttpHeaders.EndEpk] = targetRange.Max;
}

private bool ShouldApplyEPKFiltering(out FeedRangeEpk feedRangeEpk)
{
if (this.feedRangeInternal is FeedRangeEpk feedRangeEpkInternal)
{
feedRangeEpk = feedRangeEpkInternal;
return true;
}

feedRangeEpk = null;
return false;
}

private async Task<TryCatch<string>> TryInitializeContainerRIdAsync(CancellationToken cancellationToken)
{
try
{
string containerRId = await this.container.GetRIDAsync(cancellationToken);
return TryCatch<string>.FromResult(containerRId);
}
catch (CosmosException cosmosException)
{
return TryCatch<string>.FromException(cosmosException);
}
}
}
}
Loading