Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Feed Processor: Adds support for EPK leases #2013

Merged
merged 48 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
5bdd578
Rename
ealsur Nov 2, 2020
a10d15b
refactor
ealsur Nov 2, 2020
6b4312c
initial if
ealsur Nov 2, 2020
bb5012a
versioning the lease schema
ealsur Nov 3, 2020
8274686
serializable
ealsur Nov 3, 2020
8767143
serialization tests
ealsur Nov 3, 2020
e300aa8
Refactor synchronizer
ealsur Nov 4, 2020
2bab5bf
no need for this Task
ealsur Nov 4, 2020
6821fc3
Wiring iterator
ealsur Nov 4, 2020
a1dd2f8
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 4, 2020
51a02f8
Wiring split/merge detection
ealsur Nov 4, 2020
e0a313f
fixing tests
ealsur Nov 4, 2020
966728f
usings
ealsur Nov 4, 2020
8fbf562
wiring lease upgrade
ealsur Nov 4, 2020
de3485f
new tests
ealsur Nov 4, 2020
3ddb7e3
more tests
ealsur Nov 4, 2020
bb371d9
custom routing
ealsur Nov 5, 2020
81d53da
mockable
ealsur Nov 6, 2020
14cd037
adding more tests
ealsur Nov 6, 2020
39baece
more tests
ealsur Nov 6, 2020
dad768f
More tests
ealsur Nov 6, 2020
5d6819e
tests
ealsur Nov 6, 2020
3d2ec91
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 13, 2020
f8636b2
Refactoring and more tests
ealsur Nov 17, 2020
e2ae264
Refactor for clarity
ealsur Nov 17, 2020
4f9a996
Adding notes
ealsur Nov 17, 2020
c751b00
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 17, 2020
5b2350f
More tests
ealsur Nov 18, 2020
a195c82
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 19, 2020
4a8daf5
Merge branch 'master' into users/ealsur/mergecfp
ealsur Nov 23, 2020
70ce7c4
merge with master fixes
ealsur Nov 23, 2020
9a90273
merge with master
ealsur Dec 2, 2020
d2134f8
new interfaces and merge conflicts
ealsur Dec 2, 2020
bc670ea
Merge branch 'master' into users/ealsur/mergecfp
j82w Dec 2, 2020
98f3e38
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 7, 2020
0c45e1f
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
67b7677
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
8581d86
Update ChangeFeedPartitionKeyResultSetIteratorCore.cs
ealsur Dec 7, 2020
2f3158e
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
4e341bc
Update ChangeFeedPartitionKeyResultSetIteratorCoreTests.cs
ealsur Dec 7, 2020
6c4f2b8
Update ChangeFeedPartitionKeyResultSetIteratorCore.cs
ealsur Dec 7, 2020
b24763f
fixing filtering
ealsur Dec 9, 2020
ed0e1f5
using requestinvokerhandler
ealsur Dec 9, 2020
c3cc446
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 9, 2020
6caba43
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 12, 2020
7f398ae
Merge branch 'master' into users/ealsur/mergecfp
ealsur Dec 14, 2020
0e9a697
Adding a comment
ealsur Dec 14, 2020
0c3a06e
Removing extra converter
ealsur Dec 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,72 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.ChangeFeed.LeaseManagement;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

/// <summary>
/// Cosmos Change Feed Iterator for a particular Partition Key Range
/// </summary>
internal sealed class ChangeFeedPartitionKeyResultSetIteratorCore : FeedIteratorInternal
{
public static ChangeFeedPartitionKeyResultSetIteratorCore Create(
DocumentServiceLease lease,
string continuationToken,
int? maxItemCount,
ContainerInternal container,
DateTime? startTime,
bool startFromBeginning)
{
// If the lease represents a full partition (old schema) then use a FeedRangePartitionKeyRange
// If the lease represents an EPK range (new schema) the use the FeedRange in the lease
FeedRangeInternal feedRange = lease is DocumentServiceLeaseCoreEpk ? lease.FeedRange : new FeedRangePartitionKeyRange(lease.CurrentLeaseToken);

ChangeFeedStartFrom startFrom;
if (continuationToken != null)
{
// For continuation based feed range we need to manufactor a new continuation token that has the partition key range id in it.
startFrom = new ChangeFeedStartFromContinuationAndFeedRange(continuationToken, feedRange);
}
else if (startTime.HasValue)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
startFrom = ChangeFeedStartFrom.Time(startTime.Value, feedRange);
}
else if (startFromBeginning)
{
startFrom = ChangeFeedStartFrom.Beginning(feedRange);
}
else
{
startFrom = ChangeFeedStartFrom.Now(feedRange);
}

ChangeFeedRequestOptions requestOptions = new ChangeFeedRequestOptions()
{
PageSizeHint = maxItemCount,
};

return new ChangeFeedPartitionKeyResultSetIteratorCore(
container: container,
changeFeedStartFrom: startFrom,
options: requestOptions);
}

private readonly CosmosClientContext clientContext;
private readonly ContainerInternal container;
private readonly ChangeFeedRequestOptions changeFeedOptions;
private ChangeFeedStartFrom changeFeedStartFrom;
private bool hasMoreResultsInternal;

public ChangeFeedPartitionKeyResultSetIteratorCore(
CosmosClientContext clientContext,
private ChangeFeedPartitionKeyResultSetIteratorCore(
ContainerInternal container,
ChangeFeedStartFrom changeFeedStartFrom,
ChangeFeedRequestOptions options)
{
this.clientContext = clientContext ?? throw new ArgumentNullException(nameof(clientContext));
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.changeFeedStartFrom = changeFeedStartFrom;
this.changeFeedStartFrom = changeFeedStartFrom ?? throw new ArgumentNullException(nameof(changeFeedStartFrom));
this.clientContext = this.container.ClientContext;
this.changeFeedOptions = options;
}

Expand All @@ -55,11 +98,12 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella
ResponseMessage responseMessage = await this.clientContext.ProcessResourceOperationStreamAsync(
cosmosContainerCore: this.container,
resourceUri: this.container.LinkUri,
resourceType: Documents.ResourceType.Document,
operationType: Documents.OperationType.ReadFeed,
resourceType: ResourceType.Document,
operationType: OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
requestEnricher: (requestMessage) =>
{
// Set time headers if any
ChangeFeedStartFromRequestOptionPopulator visitor = new ChangeFeedStartFromRequestOptionPopulator(requestMessage);
this.changeFeedStartFrom.Accept(visitor);
},
Expand All @@ -73,10 +117,9 @@ public override async Task<ResponseMessage> ReadNextAsync(ITrace trace, Cancella
string etag = responseMessage.Headers.ETag;
this.hasMoreResultsInternal = responseMessage.IsSuccessStatusCode;
responseMessage.Headers.ContinuationToken = etag;
FeedRangeInternal feedRange = (FeedRangeInternal)this.changeFeedStartFrom.FeedRange;
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, feedRange);
this.changeFeedStartFrom = new ChangeFeedStartFromContinuationAndFeedRange(etag, (FeedRangeInternal)this.changeFeedStartFrom.FeedRange);

return responseMessage;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed.Bootstrapping
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
Expand All @@ -27,93 +26,157 @@ internal sealed class PartitionSynchronizerCore : PartitionSynchronizer
private readonly DocumentServiceLeaseContainer leaseContainer;
private readonly DocumentServiceLeaseManager leaseManager;
private readonly int degreeOfParallelism;
private readonly int maxBatchSize;
private readonly Routing.PartitionKeyRangeCache partitionKeyRangeCache;
private readonly string containerRid;

public PartitionSynchronizerCore(
ContainerInternal container,
DocumentServiceLeaseContainer leaseContainer,
DocumentServiceLeaseManager leaseManager,
int degreeOfParallelism,
int maxBatchSize)
Routing.PartitionKeyRangeCache partitionKeyRangeCache,
string containerRid)
{
this.container = container;
this.leaseContainer = leaseContainer;
this.leaseManager = leaseManager;
this.degreeOfParallelism = degreeOfParallelism;
this.maxBatchSize = maxBatchSize;
this.partitionKeyRangeCache = partitionKeyRangeCache;
this.containerRid = containerRid;
}

public override async Task CreateMissingLeasesAsync()
{
List<PartitionKeyRange> ranges = await this.EnumPartitionKeyRangesAsync().ConfigureAwait(false);
HashSet<string> partitionIds = new HashSet<string>(ranges.Select(range => range.Id));
DefaultTrace.TraceInformation("Source collection: '{0}', {1} partition(s)", this.container.LinkUri.ToString(), partitionIds.Count);
await this.CreateLeasesAsync(partitionIds).ConfigureAwait(false);
IReadOnlyList<PartitionKeyRange> ranges = await this.partitionKeyRangeCache.TryGetOverlappingRangesAsync(this.containerRid, FeedRangeEpk.FullRange.Range, forceRefresh: true);
DefaultTrace.TraceInformation("Source collection: '{0}', {1} partition(s)", this.container.LinkUri, ranges.Count);
await this.CreateLeasesAsync(ranges).ConfigureAwait(false);
}

public override async Task<IEnumerable<DocumentServiceLease>> SplitPartitionAsync(DocumentServiceLease lease)
/// <summary>
/// Handle a Partition Gone response and decide what to do based on the type of lease.
/// </summary>
/// <returns>Returns the list of leases to create and a boolean that indicates whether or not to remove the current lease.</returns>
public override async Task<(IEnumerable<DocumentServiceLease>, bool)> HandlePartitionGoneAsync(DocumentServiceLease lease)
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
if (lease == null)
{
throw new ArgumentNullException(nameof(lease));
}

string partitionId = lease.CurrentLeaseToken;
string leaseToken = lease.CurrentLeaseToken;
string lastContinuationToken = lease.ContinuationToken;

DefaultTrace.TraceInformation("Lease {0} is gone due to split", partitionId);
DefaultTrace.TraceInformation("Lease {0} is gone due to split or merge", leaseToken);

// After split the childs are either all or none available
List<PartitionKeyRange> ranges = await this.EnumPartitionKeyRangesAsync().ConfigureAwait(false);
List<string> addedPartitionIds = ranges.Where(range => range.Parents.Contains(partitionId)).Select(range => range.Id).ToList();
if (addedPartitionIds.Count == 0)
IReadOnlyList<PartitionKeyRange> overlappingRanges = await this.partitionKeyRangeCache.TryGetOverlappingRangesAsync(this.containerRid, ((FeedRangeEpk)lease.FeedRange).Range, forceRefresh: true);
if (overlappingRanges.Count == 0)
{
DefaultTrace.TraceError("Lease {0} had split but we failed to find at least one child partition", partitionId);
DefaultTrace.TraceError("Lease {0} is gone but we failed to find at least one child range", leaseToken);
throw new InvalidOperationException();
}

return lease switch
{
DocumentServiceLeaseCoreEpk feedRangeBaseLease => await this.HandlePartitionGoneAsync(leaseToken, lastContinuationToken, feedRangeBaseLease, overlappingRanges),
_ => await this.HandlePartitionGoneAsync(leaseToken, lastContinuationToken, (DocumentServiceLeaseCore)lease, overlappingRanges)
};
}

/// <summary>
/// Handles splits and merges for partition based leases.
/// </summary>
private async Task<(IEnumerable<DocumentServiceLease>, bool)> HandlePartitionGoneAsync(
string leaseToken,
string lastContinuationToken,
DocumentServiceLeaseCore partitionBasedLease,
IReadOnlyList<PartitionKeyRange> overlappingRanges)
{
ConcurrentQueue<DocumentServiceLease> newLeases = new ConcurrentQueue<DocumentServiceLease>();
await addedPartitionIds.ForEachAsync(
async addedRangeId =>
{
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(addedRangeId, lastContinuationToken).ConfigureAwait(false);
if (newLease != null)
if (overlappingRanges.Count > 1)
{
// Split: More than two children
await overlappingRanges.ForEachAsync(
async addedRange =>
{
newLeases.Enqueue(newLease);
}
},
this.degreeOfParallelism).ConfigureAwait(false);
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(addedRange, lastContinuationToken);
if (newLease != null)
{
newLeases.Enqueue(newLease);
}
},
this.degreeOfParallelism).ConfigureAwait(false);

DefaultTrace.TraceInformation("Lease {0} split into {1}", leaseToken, string.Join(", ", newLeases.Select(l => l.CurrentLeaseToken)));
}
else
{
// Merge: 1 children, multiple ranges merged into 1
PartitionKeyRange mergedRange = overlappingRanges[0];
DefaultTrace.TraceInformation("Lease {0} merged into {1}", leaseToken, mergedRange.Id);

DefaultTrace.TraceInformation("lease {0} split into {1}", partitionId, string.Join(", ", newLeases.Select(l => l.CurrentLeaseToken)));
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync((FeedRangeEpk)partitionBasedLease.FeedRange, lastContinuationToken);
if (newLease != null)
{
newLeases.Enqueue(newLease);
}
}

return newLeases;
return (newLeases, true);
}

private async Task<List<PartitionKeyRange>> EnumPartitionKeyRangesAsync()
/// <summary>
/// Handles splits and merges for feed range based leases.
/// </summary>
private async Task<(IEnumerable<DocumentServiceLease>, bool)> HandlePartitionGoneAsync(
string leaseToken,
string lastContinuationToken,
DocumentServiceLeaseCoreEpk feedRangeBasedLease,
IReadOnlyList<PartitionKeyRange> overlappingRanges)
{
string containerUri = this.container.LinkUri.ToString();
string partitionKeyRangesPath = string.Format(CultureInfo.InvariantCulture, "{0}/pkranges", containerUri);

IDocumentFeedResponse<PartitionKeyRange> response = null;
List<PartitionKeyRange> partitionKeyRanges = new List<PartitionKeyRange>();
do
List<DocumentServiceLease> newLeases = new List<DocumentServiceLease>();
if (overlappingRanges.Count > 1)
{
FeedOptions feedOptions = new FeedOptions
// Split: More than two children spanning the feed range
FeedRangeEpk splitRange = (FeedRangeEpk)feedRangeBasedLease.FeedRange;
string min = splitRange.Range.Min;
string max = splitRange.Range.Max;

// Create new leases starting from the current min and ending in the current max and across the ordered list of partitions
for (int i = 0; i < overlappingRanges.Count - 1; i++)
{
MaxItemCount = this.maxBatchSize,
RequestContinuationToken = response?.ResponseContinuation,
};
Documents.Routing.Range<string> partitionRange = overlappingRanges[i].ToRange();
Documents.Routing.Range<string> mergedRange = new Documents.Routing.Range<string>(min, partitionRange.Max, true, false);
DocumentServiceLease newLease = await this.leaseManager.CreateLeaseIfNotExistAsync(new FeedRangeEpk(mergedRange), lastContinuationToken);
if (newLease != null)
{
newLeases.Add(newLease);
}

response = await this.container.ClientContext.DocumentClient.ReadPartitionKeyRangeFeedAsync(containerUri, feedOptions).ConfigureAwait(false);
IEnumerator<PartitionKeyRange> enumerator = response.GetEnumerator();
while (enumerator.MoveNext())
min = partitionRange.Max;
}

// Add the last range with the original max and the last min from the split
Documents.Routing.Range<string> lastRangeAfterSplit = new Documents.Routing.Range<string>(min, max, true, false);
DocumentServiceLease lastLease = await this.leaseManager.CreateLeaseIfNotExistAsync(new FeedRangeEpk(lastRangeAfterSplit), lastContinuationToken);
if (lastLease != null)
{
partitionKeyRanges.Add(enumerator.Current);
newLeases.Add(lastLease);
}

DefaultTrace.TraceInformation("Lease {0} split into {1}", leaseToken, string.Join(", ", newLeases.Select(l => l.CurrentLeaseToken)));

return (newLeases, true);
}
while (!string.IsNullOrEmpty(response.ResponseContinuation));
else
{
// If we have only 1 mapped partition after the Gone, it means this epk range just remapped to another partition
newLeases.Add(feedRangeBasedLease);

DefaultTrace.TraceInformation("Lease {0} redirected to {1}", leaseToken, overlappingRanges[0].Id);

return partitionKeyRanges;
// Since the lease was just redirected, we don't need to delete it
return (newLeases, false);
}
}

/// <summary>
Expand All @@ -123,16 +186,42 @@ private async Task<List<PartitionKeyRange>> EnumPartitionKeyRangesAsync()
/// Same applies also to split partitions. We do not search for parent lease and take continuation token since this might end up
/// of reprocessing all the events since the split.
/// </summary>
private async Task CreateLeasesAsync(HashSet<string> partitionIds)
private async Task CreateLeasesAsync(IReadOnlyList<PartitionKeyRange> partitionKeyRanges)
{
// Get leases after getting ranges, to make sure that no other hosts checked in continuation token for split partition after we got leases.
IEnumerable<DocumentServiceLease> leases = await this.leaseContainer.GetAllLeasesAsync().ConfigureAwait(false);
HashSet<string> existingPartitionIds = new HashSet<string>(leases.Select(lease => lease.CurrentLeaseToken));
HashSet<string> addedPartitionIds = new HashSet<string>(partitionIds);
addedPartitionIds.ExceptWith(existingPartitionIds);
IReadOnlyList<DocumentServiceLease> leases = await this.leaseContainer.GetAllLeasesAsync().ConfigureAwait(false);
IReadOnlyList<PartitionKeyRange> rangesToAdd = partitionKeyRanges;
if (leases.Count > 0)
{
List<string> pkRangeBasedLeases = leases.Where(lease => lease is DocumentServiceLeaseCore).Select(lease => lease.CurrentLeaseToken).ToList();
List<PartitionKeyRange> missingRanges = new List<PartitionKeyRange>();
foreach (PartitionKeyRange partitionKeyRange in partitionKeyRanges)
{
// Check if there is a PKRange based lease already
if (pkRangeBasedLeases.Contains(partitionKeyRange.Id))
ealsur marked this conversation as resolved.
Show resolved Hide resolved
{
continue;
}

// Check if there are EPKBased leases for the partition range
// If there is at least one, we assume there are others that cover the rest of the full partition range
// based on the fact that the lease store was always initialized for the full collection
Documents.Routing.Range<string> partitionRange = partitionKeyRange.ToRange();
if (leases.Where(lease => lease is DocumentServiceLeaseCoreEpk
&& lease.FeedRange is FeedRangeEpk feedRangeEpk
&& (partitionRange.Min == feedRangeEpk.Range.Min || partitionRange.Max == feedRangeEpk.Range.Max)).Any())
{
continue;
}

missingRanges.Add(partitionKeyRange);
}

rangesToAdd = missingRanges;
}

await addedPartitionIds.ForEachAsync(
async addedRangeId => { await this.leaseManager.CreateLeaseIfNotExistAsync(addedRangeId, continuationToken: null).ConfigureAwait(false); },
await rangesToAdd.ForEachAsync(
async addedRange => await this.leaseManager.CreateLeaseIfNotExistAsync(addedRange, continuationToken: null).ConfigureAwait(false),
this.degreeOfParallelism).ConfigureAwait(false);
}
}
Expand Down
Loading