-
Notifications
You must be signed in to change notification settings - Fork 494
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Cross Partition Execution Context Refactor (#1260)
* realized that this will be a lot of work * wired up continuation token on the write path * wiring continuation new continuation token in the read path * updated distinct and group by * drafted out code * wired in serialize state * fixed test cases * I give up * can't do it * added min max continuation token * removed TryGetContinuationToken * fixed random bugs * fixed more bugs * about to gut out RequestContinuationToken and just wire through CosmosElement * made input a cosmos element * returning continuation token instead * updated tests * resolved iteration comments * broke up parallel execution context into different files based on stages * refactored parrallel resume code * broke order by into separate files * refactored code * fixed bugs * resolved iteration comments * resolved iteration comments * Update Microsoft.Azure.Cosmos/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Resume.cs Co-Authored-By: j82w <j82w@users.noreply.github.com> * added tests and fixed off by one error * fixed typo Co-authored-by: j82w <j82w@users.noreply.github.com>
- Loading branch information
Showing
13 changed files
with
1,798 additions
and
1,382 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
Microsoft.Azure.Cosmos/src/Query/Core/ContinuationTokens/IPartitionedToken.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
//------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
//------------------------------------------------------------ | ||
|
||
namespace Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens | ||
{ | ||
internal interface IPartitionedToken | ||
{ | ||
Documents.Routing.Range<string> PartitionRange { get; } | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
247 changes: 125 additions & 122 deletions
247
...Azure.Cosmos/src/Query/Core/ExecutionContext/CosmosCrossPartitionQueryExecutionContext.cs
Large diffs are not rendered by default.
Oops, something went wrong.
149 changes: 149 additions & 0 deletions
149
...Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.ContinuationToken.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
// ------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// ------------------------------------------------------------ | ||
|
||
namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy | ||
{ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using Microsoft.Azure.Cosmos.CosmosElements; | ||
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; | ||
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; | ||
|
||
internal sealed partial class CosmosOrderByItemQueryExecutionContext | ||
{ | ||
/// <summary> | ||
/// Gets the continuation token for an order by query. | ||
/// </summary> | ||
protected override string ContinuationToken | ||
{ | ||
// In general the continuation token for order by queries contains the following information: | ||
// 1) What partition did we leave off on | ||
// 2) What value did we leave off | ||
// Along with the constraints that we get from how we drain the documents: | ||
// Let <x, y> mean that the last item we drained was item x from partition y. | ||
// Then we know that for all partitions | ||
// * < y that we have drained all items <= x | ||
// * > y that we have drained all items < x | ||
// * = y that we have drained all items <= x based on the backend continuation token for y | ||
// With this information we have captured the progress for all partitions in a single continuation token. | ||
get | ||
{ | ||
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers(); | ||
string continuationToken; | ||
if (activeItemProducers.Any()) | ||
{ | ||
IEnumerable<CosmosElement> orderByContinuationTokens = activeItemProducers.Select((itemProducer) => | ||
{ | ||
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(itemProducer.Current); | ||
string filter = itemProducer.Filter; | ||
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( | ||
new CompositeContinuationToken | ||
{ | ||
Token = itemProducer.PreviousContinuationToken, | ||
Range = itemProducer.PartitionKeyRange.ToRange(), | ||
}, | ||
orderByQueryResult.OrderByItems, | ||
orderByQueryResult.Rid, | ||
this.ShouldIncrementSkipCount(itemProducer) ? this.skipCount + 1 : 0, | ||
filter); | ||
return OrderByContinuationToken.ToCosmosElement(orderByContinuationToken); | ||
}); | ||
|
||
continuationToken = CosmosArray.Create(orderByContinuationTokens).ToString(); | ||
} | ||
else | ||
{ | ||
continuationToken = null; | ||
} | ||
|
||
// Note we are no longer escaping non ascii continuation tokens. | ||
// It is the callers job to encode a continuation token before adding it to a header in their service. | ||
|
||
return continuationToken; | ||
} | ||
} | ||
|
||
public override CosmosElement GetCosmosElementContinuationToken() | ||
{ | ||
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers(); | ||
if (!activeItemProducers.Any()) | ||
{ | ||
return default; | ||
} | ||
|
||
List<CosmosElement> orderByContinuationTokens = new List<CosmosElement>(); | ||
foreach (ItemProducer activeItemProducer in activeItemProducers) | ||
{ | ||
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(activeItemProducer.Current); | ||
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken( | ||
compositeContinuationToken: new CompositeContinuationToken() | ||
{ | ||
Token = activeItemProducer.PreviousContinuationToken, | ||
Range = new Documents.Routing.Range<string>( | ||
min: activeItemProducer.PartitionKeyRange.MinInclusive, | ||
max: activeItemProducer.PartitionKeyRange.MaxExclusive, | ||
isMinInclusive: true, | ||
isMaxInclusive: false) | ||
}, | ||
orderByItems: orderByQueryResult.OrderByItems, | ||
rid: orderByQueryResult.Rid, | ||
skipCount: this.ShouldIncrementSkipCount(activeItemProducer) ? this.skipCount + 1 : 0, | ||
filter: activeItemProducer.Filter); | ||
|
||
CosmosElement cosmosElementToken = OrderByContinuationToken.ToCosmosElement(orderByContinuationToken); | ||
orderByContinuationTokens.Add(cosmosElementToken); | ||
} | ||
|
||
return CosmosArray.Create(orderByContinuationTokens); | ||
} | ||
|
||
/// <summary> | ||
/// Equality comparer used to determine if a document producer needs it's continuation token returned. | ||
/// Basically just says that the continuation token can be flushed once you stop seeing duplicates. | ||
/// </summary> | ||
private sealed class OrderByEqualityComparer : IEqualityComparer<CosmosElement> | ||
{ | ||
/// <summary> | ||
/// The order by comparer. | ||
/// </summary> | ||
private readonly OrderByItemProducerTreeComparer orderByConsumeComparer; | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the OrderByEqualityComparer class. | ||
/// </summary> | ||
/// <param name="orderByConsumeComparer">The order by consume comparer.</param> | ||
public OrderByEqualityComparer(OrderByItemProducerTreeComparer orderByConsumeComparer) | ||
{ | ||
this.orderByConsumeComparer = orderByConsumeComparer ?? throw new ArgumentNullException($"{nameof(orderByConsumeComparer)} can not be null."); | ||
} | ||
|
||
/// <summary> | ||
/// Gets whether two OrderByQueryResult instances are equal. | ||
/// </summary> | ||
/// <param name="x">The first.</param> | ||
/// <param name="y">The second.</param> | ||
/// <returns>Whether two OrderByQueryResult instances are equal.</returns> | ||
public bool Equals(CosmosElement x, CosmosElement y) | ||
{ | ||
OrderByQueryResult orderByQueryResultX = new OrderByQueryResult(x); | ||
OrderByQueryResult orderByQueryResultY = new OrderByQueryResult(y); | ||
return this.orderByConsumeComparer.CompareOrderByItems( | ||
orderByQueryResultX.OrderByItems, | ||
orderByQueryResultY.OrderByItems) == 0; | ||
} | ||
|
||
/// <summary> | ||
/// Gets the hash code for object. | ||
/// </summary> | ||
/// <param name="obj">The object to hash.</param> | ||
/// <returns>The hash code for the OrderByQueryResult object.</returns> | ||
public int GetHashCode(CosmosElement obj) | ||
{ | ||
return 0; | ||
} | ||
} | ||
} | ||
} |
145 changes: 145 additions & 0 deletions
145
...s/src/Query/Core/ExecutionContext/OrderBy/CosmosOrderByItemQueryExecutionContext.Drain.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// ------------------------------------------------------------ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// ------------------------------------------------------------ | ||
|
||
namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy | ||
{ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Microsoft.Azure.Cosmos.CosmosElements; | ||
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers; | ||
using Microsoft.Azure.Cosmos.Query.Core.QueryClient; | ||
|
||
internal sealed partial class CosmosOrderByItemQueryExecutionContext | ||
{ | ||
/// <summary> | ||
/// Drains a page of documents from this context. | ||
/// </summary> | ||
/// <param name="maxElements">The maximum number of elements.</param> | ||
/// <param name="cancellationToken">The cancellation token.</param> | ||
/// <returns>A task that when awaited on return a page of documents.</returns> | ||
public override async Task<QueryResponseCore> DrainAsync(int maxElements, CancellationToken cancellationToken) | ||
{ | ||
cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
//// In order to maintain the continuation token for the user we must drain with a few constraints | ||
//// 1) We always drain from the partition, which has the highest priority item first | ||
//// 2) If multiple partitions have the same priority item then we drain from the left most first | ||
//// otherwise we would need to keep track of how many of each item we drained from each partition | ||
//// (just like parallel queries). | ||
//// Visually that look the following case where we have three partitions that are numbered and store letters. | ||
//// For teaching purposes I have made each item a tuple of the following form: | ||
//// <item stored in partition, partition number> | ||
//// So that duplicates across partitions are distinct, but duplicates within partitions are indistinguishable. | ||
//// |-------| |-------| |-------| | ||
//// | <a,1> | | <a,2> | | <a,3> | | ||
//// | <a,1> | | <b,2> | | <c,3> | | ||
//// | <a,1> | | <b,2> | | <c,3> | | ||
//// | <d,1> | | <c,2> | | <c,3> | | ||
//// | <d,1> | | <e,2> | | <f,3> | | ||
//// | <e,1> | | <h,2> | | <j,3> | | ||
//// | <f,1> | | <i,2> | | <k,3> | | ||
//// |-------| |-------| |-------| | ||
//// Now the correct drain order in this case is: | ||
//// <a,1>,<a,1>,<a,1>,<a,2>,<a,3>,<b,2>,<b,2>,<c,2>,<c,3>,<c,3>,<c,3>, | ||
//// <d,1>,<d,1>,<e,1>,<e,2>,<f,1>,<f,3>,<h,2>,<i,2>,<j,3>,<k,3> | ||
//// In more mathematical terms | ||
//// 1) <x, y> always comes before <z, y> where x < z | ||
//// 2) <i, j> always come before <i, k> where j < k | ||
|
||
List<CosmosElement> results = new List<CosmosElement>(); | ||
while (results.Count < maxElements) | ||
{ | ||
// Only drain from the highest priority document producer | ||
// We need to pop and push back the document producer tree, since the priority changes according to the sort order. | ||
ItemProducerTree currentItemProducerTree = this.PopCurrentItemProducerTree(); | ||
try | ||
{ | ||
if (!currentItemProducerTree.HasMoreResults) | ||
{ | ||
// This means there are no more items to drain | ||
break; | ||
} | ||
|
||
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(currentItemProducerTree.Current); | ||
|
||
// Only add the payload, since other stuff is garbage from the caller's perspective. | ||
results.Add(orderByQueryResult.Payload); | ||
|
||
// If we are at the beginning of the page and seeing an rid from the previous page we should increment the skip count | ||
// due to the fact that JOINs can make a document appear multiple times and across continuations, so we don't want to | ||
// surface this more than needed. More information can be found in the continuation token docs. | ||
if (this.ShouldIncrementSkipCount(currentItemProducerTree.CurrentItemProducerTree.Root)) | ||
{ | ||
++this.skipCount; | ||
} | ||
else | ||
{ | ||
this.skipCount = 0; | ||
} | ||
|
||
this.previousRid = orderByQueryResult.Rid; | ||
this.previousOrderByItems = orderByQueryResult.OrderByItems; | ||
|
||
if (!currentItemProducerTree.TryMoveNextDocumentWithinPage()) | ||
{ | ||
while (true) | ||
{ | ||
(bool movedToNextPage, QueryResponseCore? failureResponse) = await currentItemProducerTree.TryMoveNextPageAsync(cancellationToken); | ||
if (!movedToNextPage) | ||
{ | ||
if (failureResponse.HasValue) | ||
{ | ||
// TODO: We can buffer this failure so that the user can still get the pages we already got. | ||
return failureResponse.Value; | ||
} | ||
|
||
break; | ||
} | ||
|
||
if (currentItemProducerTree.IsAtBeginningOfPage) | ||
{ | ||
break; | ||
} | ||
|
||
if (currentItemProducerTree.TryMoveNextDocumentWithinPage()) | ||
{ | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
finally | ||
{ | ||
this.PushCurrentItemProducerTree(currentItemProducerTree); | ||
} | ||
} | ||
|
||
return QueryResponseCore.CreateSuccess( | ||
result: results, | ||
requestCharge: this.requestChargeTracker.GetAndResetCharge(), | ||
activityId: null, | ||
responseLengthBytes: this.GetAndResetResponseLengthBytes(), | ||
disallowContinuationTokenMessage: null, | ||
continuationToken: this.ContinuationToken, | ||
diagnostics: this.GetAndResetDiagnostics()); | ||
} | ||
|
||
/// <summary> | ||
/// Gets whether or not we should increment the skip count based on the rid of the document. | ||
/// </summary> | ||
/// <param name="currentItemProducer">The current document producer.</param> | ||
/// <returns>Whether or not we should increment the skip count.</returns> | ||
private bool ShouldIncrementSkipCount(ItemProducer currentItemProducer) | ||
{ | ||
// If we are not at the beginning of the page and we saw the same rid again. | ||
return !currentItemProducer.IsAtBeginningOfPage && | ||
string.Equals( | ||
this.previousRid, | ||
new OrderByQueryResult(currentItemProducer.Current).Rid, | ||
StringComparison.Ordinal); | ||
} | ||
} | ||
} |
Oops, something went wrong.