-
Notifications
You must be signed in to change notification settings - Fork 504
/
Copy pathCosmosOrderByItemQueryExecutionContext.ContinuationToken.cs
149 lines (135 loc) · 7.16 KB
/
CosmosOrderByItemQueryExecutionContext.ContinuationToken.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------
namespace Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.OrderBy
{
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens;
using Microsoft.Azure.Cosmos.Query.Core.ExecutionContext.ItemProducers;
internal sealed partial class CosmosOrderByItemQueryExecutionContext
{
/// <summary>
/// Gets the continuation token for an order by query.
/// </summary>
protected override string ContinuationToken
{
// In general the continuation token for order by queries contains the following information:
// 1) What partition did we leave off on
// 2) What value did we leave off
// Along with the constraints that we get from how we drain the documents:
// Let <x, y> mean that the last item we drained was item x from partition y.
// Then we know that for all partitions
// * < y that we have drained all items <= x
// * > y that we have drained all items < x
// * = y that we have drained all items <= x based on the backend continuation token for y
// With this information we have captured the progress for all partitions in a single continuation token.
get
{
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers();
string continuationToken;
if (activeItemProducers.Any())
{
IEnumerable<CosmosElement> orderByContinuationTokens = activeItemProducers.Select((itemProducer) =>
{
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(itemProducer.Current);
string filter = itemProducer.Filter;
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
new CompositeContinuationToken
{
Token = itemProducer.PreviousContinuationToken,
Range = itemProducer.PartitionKeyRange.ToRange(),
},
orderByQueryResult.OrderByItems,
orderByQueryResult.Rid,
this.ShouldIncrementSkipCount(itemProducer) ? this.skipCount + 1 : 0,
filter);
return OrderByContinuationToken.ToCosmosElement(orderByContinuationToken);
});
continuationToken = CosmosArray.Create(orderByContinuationTokens).ToString();
}
else
{
continuationToken = null;
}
// Note we are no longer escaping non ascii continuation tokens.
// It is the callers job to encode a continuation token before adding it to a header in their service.
return continuationToken;
}
}
public override CosmosElement GetCosmosElementContinuationToken()
{
IEnumerable<ItemProducer> activeItemProducers = this.GetActiveItemProducers();
if (!activeItemProducers.Any())
{
return default;
}
List<CosmosElement> orderByContinuationTokens = new List<CosmosElement>();
foreach (ItemProducer activeItemProducer in activeItemProducers)
{
OrderByQueryResult orderByQueryResult = new OrderByQueryResult(activeItemProducer.Current);
OrderByContinuationToken orderByContinuationToken = new OrderByContinuationToken(
compositeContinuationToken: new CompositeContinuationToken()
{
Token = activeItemProducer.PreviousContinuationToken,
Range = new Documents.Routing.Range<string>(
min: activeItemProducer.PartitionKeyRange.MinInclusive,
max: activeItemProducer.PartitionKeyRange.MaxExclusive,
isMinInclusive: true,
isMaxInclusive: false)
},
orderByItems: orderByQueryResult.OrderByItems,
rid: orderByQueryResult.Rid,
skipCount: this.ShouldIncrementSkipCount(activeItemProducer) ? this.skipCount + 1 : 0,
filter: activeItemProducer.Filter);
CosmosElement cosmosElementToken = OrderByContinuationToken.ToCosmosElement(orderByContinuationToken);
orderByContinuationTokens.Add(cosmosElementToken);
}
return CosmosArray.Create(orderByContinuationTokens);
}
/// <summary>
/// Equality comparer used to determine if a document producer needs it's continuation token returned.
/// Basically just says that the continuation token can be flushed once you stop seeing duplicates.
/// </summary>
private sealed class OrderByEqualityComparer : IEqualityComparer<CosmosElement>
{
/// <summary>
/// The order by comparer.
/// </summary>
private readonly OrderByItemProducerTreeComparer orderByConsumeComparer;
/// <summary>
/// Initializes a new instance of the OrderByEqualityComparer class.
/// </summary>
/// <param name="orderByConsumeComparer">The order by consume comparer.</param>
public OrderByEqualityComparer(OrderByItemProducerTreeComparer orderByConsumeComparer)
{
this.orderByConsumeComparer = orderByConsumeComparer ?? throw new ArgumentNullException($"{nameof(orderByConsumeComparer)} can not be null.");
}
/// <summary>
/// Gets whether two OrderByQueryResult instances are equal.
/// </summary>
/// <param name="x">The first.</param>
/// <param name="y">The second.</param>
/// <returns>Whether two OrderByQueryResult instances are equal.</returns>
public bool Equals(CosmosElement x, CosmosElement y)
{
OrderByQueryResult orderByQueryResultX = new OrderByQueryResult(x);
OrderByQueryResult orderByQueryResultY = new OrderByQueryResult(y);
return this.orderByConsumeComparer.CompareOrderByItems(
orderByQueryResultX.OrderByItems,
orderByQueryResultY.OrderByItems) == 0;
}
/// <summary>
/// Gets the hash code for object.
/// </summary>
/// <param name="obj">The object to hash.</param>
/// <returns>The hash code for the OrderByQueryResult object.</returns>
public int GetHashCode(CosmosElement obj)
{
return 0;
}
}
}
}