Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal Pagination: Adds Merge Proofing #2084

Merged
merged 11 commits into from
Jan 11, 2021
19 changes: 19 additions & 0 deletions Microsoft.Azure.Cosmos/src/FeedRange/FeedRanges/FeedRangeEpk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,24 @@ public override TResult Accept<TResult>(IFeedRangeTransformer<TResult> transform
{
return transformer.Visit(this);
}

public override bool Equals(object obj)
{
return obj is FeedRangeEpk objAsFeedRangeEpk && this.Equals(objAsFeedRangeEpk);
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}

public bool Equals(FeedRangeEpk other)
{
return (other != null)
&& this.Range.Min.Equals(other.Range.Min)
&& this.Range.Max.Equals(other.Range.Max)
&& this.Range.IsMinInclusive.Equals(other.Range.IsMinInclusive)
&& this.Range.IsMaxInclusive.Equals(other.Range.IsMaxInclusive);
}

public override int GetHashCode()
{
return this.Range.Min.GetHashCode() ^ this.Range.Max.GetHashCode();
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public async ValueTask<bool> MoveNextAsync(ITrace trace)
if (IsSplitException(exception))
{
// Handle split

List<FeedRangeEpk> childRanges = await this.feedRangeProvider.GetChildRangeAsync(
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
currentPaginator.Range,
childTrace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(

PartitionMapper.PartitionMapping<OrderByContinuationToken> partitionMapping = monadicGetOrderByContinuationTokenMapping.Result;
IReadOnlyList<CosmosElement> orderByItems = partitionMapping
.TargetPartition
.TargetMapping
.Values
.First()
.OrderByItems
Expand All @@ -639,9 +639,9 @@ public static TryCatch<IQueryPipelineStage> MonadicCreate(
(string leftFilter, string targetFilter, string rightFilter) = OrderByCrossPartitionQueryPipelineStage.GetFormattedFilters(columnAndItems);
List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)> tokenMappingAndFilters = new List<(IReadOnlyDictionary<FeedRangeEpk, OrderByContinuationToken>, string)>()
{
{ (partitionMapping.PartitionsLeftOfTarget, leftFilter) },
{ (partitionMapping.TargetPartition, targetFilter) },
{ (partitionMapping.PartitionsRightOfTarget, rightFilter) },
{ (partitionMapping.MappingLeftOfTarget, leftFilter) },
{ (partitionMapping.TargetMapping, targetFilter) },
{ (partitionMapping.MappingRightOfTarget, rightFilter) },
};

enumeratorsAndTokens = new List<(OrderByQueryPartitionRangePageAsyncEnumerator, OrderByContinuationToken)>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ private static TryCatch<CrossFeedRangeState<QueryState>> MonadicExtractState(
List<IReadOnlyDictionary<FeedRangeEpk, ParallelContinuationToken>> rangesToInitialize = new List<IReadOnlyDictionary<FeedRangeEpk, ParallelContinuationToken>>()
{
// Skip all the partitions left of the target range, since they have already been drained fully.
partitionMapping.TargetPartition,
partitionMapping.PartitionsRightOfTarget,
partitionMapping.TargetMapping,
partitionMapping.MappingRightOfTarget,
};

foreach (IReadOnlyDictionary<FeedRangeEpk, ParallelContinuationToken> rangeToInitalize in rangesToInitialize)
Expand Down

Large diffs are not rendered by default.

49 changes: 48 additions & 1 deletion Microsoft.Azure.Cosmos/src/Routing/PartitionKeyHashRange.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace Microsoft.Azure.Cosmos.Routing
{
using System;
using System.Text;
using Microsoft.Azure.Documents;

internal readonly struct PartitionKeyHashRange : IComparable<PartitionKeyHashRange>, IEquatable<PartitionKeyHashRange>
{
Expand Down Expand Up @@ -44,6 +43,54 @@ public bool Contains(PartitionKeyHashRange partitionKeyHashRange)
return rangeStartsBefore && rangeEndsAfter;
}

public bool TryGetOverlappingRange(PartitionKeyHashRange rangeToOverlapWith, out PartitionKeyHashRange overlappingRange)
{
PartitionKeyHash? maxOfStarts;
if (this.StartInclusive.HasValue && rangeToOverlapWith.StartInclusive.HasValue)
{
maxOfStarts = this.StartInclusive.Value > rangeToOverlapWith.StartInclusive.Value ? this.StartInclusive.Value : rangeToOverlapWith.StartInclusive.Value;
}
else if (this.StartInclusive.HasValue && !rangeToOverlapWith.StartInclusive.HasValue)
{
maxOfStarts = this.StartInclusive.Value;
}
else if (!this.StartInclusive.HasValue && rangeToOverlapWith.StartInclusive.HasValue)
{
maxOfStarts = rangeToOverlapWith.StartInclusive.Value;
}
else
{
maxOfStarts = null;
}

PartitionKeyHash? minOfEnds;
if (this.EndExclusive.HasValue && rangeToOverlapWith.EndExclusive.HasValue)
{
minOfEnds = this.EndExclusive.Value < rangeToOverlapWith.EndExclusive.Value ? this.EndExclusive.Value : rangeToOverlapWith.EndExclusive.Value;
}
else if (this.EndExclusive.HasValue && !rangeToOverlapWith.EndExclusive.HasValue)
{
minOfEnds = this.EndExclusive.Value;
}
else if (!this.EndExclusive.HasValue && rangeToOverlapWith.EndExclusive.HasValue)
{
minOfEnds = rangeToOverlapWith.EndExclusive.Value;
}
else
{
minOfEnds = null;
}

if (maxOfStarts.HasValue && minOfEnds.HasValue && (maxOfStarts >= minOfEnds))
{
overlappingRange = default;
return false;
}

overlappingRange = new PartitionKeyHashRange(maxOfStarts, minOfEnds);
return true;
}

public int CompareTo(PartitionKeyHashRange other)
{
// Provide a total sort order by first comparing on the start and then going to the end.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ namespace Microsoft.Azure.Cosmos.Tests.Pagination
{
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.VisualStudio.TestTools.UnitTesting;

[TestClass]
public sealed class CrossPartitionPartitionRangeEnumeratorTests
public sealed class CrossPartitionPartitionRangeEnumeratorTests
{
[TestMethod]
public async Task Test429sAsync()
Expand All @@ -33,13 +31,6 @@ public async Task Test429sWithContinuationsAsync()
await implementation.Test429sWithContinuationsAsync();
}

[TestMethod]
public async Task TestDrainFullyAsync()
{
Implementation implementation = new Implementation();
await implementation.TestDrainFullyAsync();
}

[TestMethod]
public async Task TestEmptyPages()
{
Expand All @@ -48,24 +39,18 @@ public async Task TestEmptyPages()
}

[TestMethod]
public async Task TestResumingFromStateAsync()
{
Implementation implementation = new Implementation();
await implementation.TestResumingFromStateAsync();
}

[TestMethod]
public async Task TestSplitWithDuringDrainAsync()
{
Implementation implementation = new Implementation();
await implementation.TestSplitWithDuringDrainAsync();
}

[TestMethod]
public async Task TestSplitWithResumeContinuationAsync()
[DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")]
[DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")]
[DataRow(false, true, false, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: false")]
[DataRow(false, true, true, DisplayName = "Use State: false, Allow Splits: true, Allow Merges: true")]
[DataRow(true, false, false, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: false")]
[DataRow(true, false, true, DisplayName = "Use State: true, Allow Splits: false, Allow Merges: true")]
[DataRow(true, true, false, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: false")]
[DataRow(true, true, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")]
public async Task TestSplitAndMergeAsync(bool useState, bool allowSplits, bool allowMerges)
{
Implementation implementation = new Implementation();
await implementation.TestSplitWithResumeContinuationAsync();
await implementation.TestSplitAndMergeImplementationAsync(useState, allowSplits, allowMerges);
}

private sealed class Implementation : PartitionRangeEnumeratorTests<CrossFeedRangePage<ReadFeedPage, ReadFeedState>, CrossFeedRangeState<ReadFeedState>>
Expand All @@ -76,58 +61,62 @@ public Implementation()
}

[TestMethod]
public async Task TestSplitWithResumeContinuationAsync()
public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allowSplits, bool allowMerges)
{
int numItems = 1000;
IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems);
IAsyncEnumerator<TryCatch<CrossFeedRangePage<ReadFeedPage, ReadFeedState>>> enumerator = this.CreateEnumerator(inMemoryCollection);

(HashSet<string> firstDrainResults, CrossFeedRangeState<ReadFeedState> state) = await this.PartialDrainAsync(enumerator, numIterations: 3);

IReadOnlyList<FeedRangeInternal> ranges = await inMemoryCollection.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default);

// Split the partition we were reading from
await inMemoryCollection.SplitAsync(ranges.First(), cancellationToken: default);

// And a partition we have let to read from
await inMemoryCollection.SplitAsync(ranges[ranges.Count / 2], cancellationToken: default);

// Resume from state
IAsyncEnumerable<TryCatch<CrossFeedRangePage<ReadFeedPage, ReadFeedState>>> enumerable = this.CreateEnumerable(inMemoryCollection, state);

HashSet<string> secondDrainResults = await this.DrainFullyAsync(enumerable);
Assert.AreEqual(numItems, firstDrainResults.Count + secondDrainResults.Count);
}

[TestMethod]
public async Task TestSplitWithDuringDrainAsync()
{
int numItems = 1000;
IDocumentContainer inMemoryCollection = await this.CreateDocumentContainerAsync(numItems);
IAsyncEnumerable<TryCatch<CrossFeedRangePage<ReadFeedPage, ReadFeedState>>> enumerable = this.CreateEnumerable(inMemoryCollection);

HashSet<string> identifiers = new HashSet<string>();
Random random = new Random();
await foreach (TryCatch<CrossFeedRangePage<ReadFeedPage, ReadFeedState>> tryGetPage in enumerable)
while (await enumerator.MoveNextAsync())
{
if (random.Next() % 2 == 0)
{
await inMemoryCollection.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default);
List<FeedRangeEpk> ranges = await inMemoryCollection.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default);
FeedRangeInternal randomRangeToSplit = ranges[random.Next(0, ranges.Count)];
await inMemoryCollection.SplitAsync(randomRangeToSplit, cancellationToken: default);
}

TryCatch<CrossFeedRangePage<ReadFeedPage, ReadFeedState>> tryGetPage = enumerator.Current;
tryGetPage.ThrowIfFailed();

IReadOnlyList<Record> records = this.GetRecordsFromPage(tryGetPage.Result);
foreach (Record record in records)
{
identifiers.Add(record.Identifier);
identifiers.Add(record.Payload["pk"].ToString());
}

if (useState)
{
if (tryGetPage.Result.State == null)
{
break;
}

enumerator = this.CreateEnumerator(inMemoryCollection, tryGetPage.Result.State);
}

if (random.Next() % 2 == 0)
{
if (allowSplits && (random.Next() % 2 == 0))
{
// Split
await inMemoryCollection.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default);
List<FeedRangeEpk> ranges = await inMemoryCollection.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default);
FeedRangeInternal randomRangeToSplit = ranges[random.Next(0, ranges.Count)];
await inMemoryCollection.SplitAsync(randomRangeToSplit, cancellationToken: default);
}

if (allowMerges && (random.Next() % 2 == 0))
{
// Merge
await inMemoryCollection.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default);
List<FeedRangeEpk> ranges = await inMemoryCollection.GetFeedRangesAsync(
trace: NoOpTrace.Singleton,
cancellationToken: default);
if (ranges.Count > 1)
{
ranges = ranges.OrderBy(range => range.Range.Min).ToList();
int indexToMerge = random.Next(0, ranges.Count);
int adjacentIndex = indexToMerge == (ranges.Count - 1) ? indexToMerge - 1 : indexToMerge + 1;
await inMemoryCollection.MergeAsync(ranges[indexToMerge], ranges[adjacentIndex], cancellationToken: default);
}
}
}
}

Expand Down Expand Up @@ -155,7 +144,7 @@ PartitionRangePageAsyncEnumerator<ReadFeedPage, ReadFeedState> createEnumerator(
maxConcurrency: 10,
state: state ?? new CrossFeedRangeState<ReadFeedState>(
new FeedRangeState<ReadFeedState>[]
{
{
new FeedRangeState<ReadFeedState>(FeedRangeEpk.FullRange, ReadFeedState.Beginning())
}));
}
Expand Down
Loading