-
Notifications
You must be signed in to change notification settings - Fork 494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Internal ReadFeed: Adds pagination library adoption #1947
Changes from all commits
04bf9a8
2fa3e20
baea898
984541a
6d0c558
e2f4d59
4a46f03
20eb017
386187f
a12a4e1
2dfcadd
d7ac465
982c860
1cf2c05
94c926f
57d5aaf
d49c742
416b907
2df1a95
20072af
993ec04
3e6a67d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,28 +18,32 @@ namespace Microsoft.Azure.Cosmos.Pagination | |
using Microsoft.Azure.Cosmos.Query.Core.Monads; | ||
using Microsoft.Azure.Cosmos.Query.Core.Pipeline; | ||
using Microsoft.Azure.Cosmos.Query.Core.QueryClient; | ||
using Microsoft.Azure.Cosmos.ReadFeed.Pagination; | ||
using Microsoft.Azure.Documents; | ||
|
||
internal sealed class NetworkAttachedDocumentContainer : IMonadicDocumentContainer | ||
{ | ||
private readonly ContainerCore container; | ||
private readonly ContainerInternal container; | ||
private readonly CosmosQueryClient cosmosQueryClient; | ||
private readonly CosmosClientContext cosmosClientContext; | ||
private readonly QueryRequestOptions queryRequestOptions; | ||
private readonly CosmosDiagnosticsContext diagnosticsContext; | ||
private readonly string resourceLink; | ||
private readonly ResourceType resourceType; | ||
|
||
public NetworkAttachedDocumentContainer( | ||
ContainerCore container, | ||
ContainerInternal container, | ||
CosmosQueryClient cosmosQueryClient, | ||
CosmosClientContext cosmosClientContext, | ||
CosmosDiagnosticsContext diagnosticsContext, | ||
QueryRequestOptions queryRequestOptions = null) | ||
QueryRequestOptions queryRequestOptions = null, | ||
string resourceLink = null, | ||
ResourceType resourceType = ResourceType.Document) | ||
{ | ||
this.container = container ?? throw new ArgumentNullException(nameof(container)); | ||
this.cosmosQueryClient = cosmosQueryClient ?? throw new ArgumentNullException(nameof(cosmosQueryClient)); | ||
this.cosmosClientContext = cosmosClientContext ?? throw new ArgumentNullException(nameof(cosmosClientContext)); | ||
this.diagnosticsContext = diagnosticsContext; | ||
this.queryRequestOptions = queryRequestOptions; | ||
this.resourceLink = resourceLink ?? this.container.LinkUri; | ||
this.resourceType = resourceType; | ||
} | ||
|
||
public Task<TryCatch> MonadicSplitAsync( | ||
|
@@ -91,7 +95,7 @@ public async Task<TryCatch<List<FeedRangeEpk>>> MonadicGetChildRangeAsync( | |
{ | ||
try | ||
{ | ||
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync( | ||
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync( | ||
this.container.LinkUri, | ||
cancellationToken); | ||
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync( | ||
|
@@ -113,13 +117,90 @@ await this.container.GetRIDAsync(cancellationToken), | |
} | ||
} | ||
|
||
public Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync( | ||
public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync( | ||
ReadFeedState readFeedState, | ||
FeedRangeInternal feedRange, | ||
ResourceId resourceIdentifer, | ||
QueryRequestOptions queryRequestOptions, | ||
bchong95 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
int pageSize, | ||
CancellationToken cancellationToken) | ||
{ | ||
throw new NotImplementedException(); | ||
cancellationToken.ThrowIfCancellationRequested(); | ||
|
||
if (feedRange is FeedRangeEpk feedRangeEpk) | ||
{ | ||
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync( | ||
this.container.LinkUri, | ||
cancellationToken); | ||
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync( | ||
this.container.LinkUri, | ||
await this.container.GetRIDAsync(cancellationToken), | ||
containerProperties.PartitionKey, | ||
feedRange); | ||
|
||
if ((overlappingRanges == null) || (overlappingRanges.Count != 1)) | ||
{ | ||
// Simulate a split exception, since we don't have a partition key range id to route to. | ||
CosmosException goneException = new CosmosException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this to force the cache refresh? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we instead force a cache refresh here? Instead of throwing the exception up and hoping it gets caught and handled? |
||
message: $"Epk Range: {feedRangeEpk.Range} is gone.", | ||
statusCode: System.Net.HttpStatusCode.Gone, | ||
subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone, | ||
activityId: Guid.NewGuid().ToString(), | ||
requestCharge: default); | ||
|
||
return TryCatch<ReadFeedPage>.FromException(goneException); | ||
} | ||
} | ||
|
||
if (queryRequestOptions != null) | ||
{ | ||
queryRequestOptions.MaxItemCount = pageSize; | ||
} | ||
|
||
ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync( | ||
resourceUri: this.resourceLink, | ||
resourceType: this.resourceType, | ||
operationType: OperationType.ReadFeed, | ||
requestOptions: queryRequestOptions, | ||
cosmosContainerCore: this.container, | ||
requestEnricher: request => | ||
{ | ||
if (!(readFeedState.ContinuationToken is CosmosNull)) | ||
{ | ||
request.Headers.ContinuationToken = (readFeedState.ContinuationToken as CosmosString).Value; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to guard against any potential nullref by checking if the continuation is a CosmosString in the upper if?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
feedRange.Accept(FeedRangeRequestMessagePopulatorVisitor.Singleton, request); | ||
}, | ||
partitionKey: queryRequestOptions?.PartitionKey, | ||
streamPayload: default, | ||
diagnosticsContext: this.diagnosticsContext, | ||
cancellationToken: cancellationToken); | ||
|
||
TryCatch<ReadFeedPage> monadicReadFeedPage; | ||
if (responseMessage.StatusCode == HttpStatusCode.OK) | ||
{ | ||
ReadFeedPage readFeedPage = new ReadFeedPage( | ||
responseMessage.Content, | ||
responseMessage.Headers.RequestCharge, | ||
responseMessage.Headers.ActivityId, | ||
responseMessage.Headers.ContinuationToken != null ? new ReadFeedState(CosmosString.Create(responseMessage.Headers.ContinuationToken)) : null); | ||
|
||
monadicReadFeedPage = TryCatch<ReadFeedPage>.FromResult(readFeedPage); | ||
} | ||
else | ||
{ | ||
CosmosException cosmosException = new CosmosException( | ||
responseMessage.ErrorMessage, | ||
statusCode: responseMessage.StatusCode, | ||
(int)responseMessage.Headers.SubStatusCode, | ||
responseMessage.Headers.ActivityId, | ||
responseMessage.Headers.RequestCharge); | ||
cosmosException.Headers.ContinuationToken = responseMessage.Headers.ContinuationToken; | ||
|
||
monadicReadFeedPage = TryCatch<ReadFeedPage>.FromException(cosmosException); | ||
} | ||
|
||
return monadicReadFeedPage; | ||
} | ||
|
||
public async Task<TryCatch<QueryPage>> MonadicQueryAsync( | ||
|
@@ -135,7 +216,7 @@ public async Task<TryCatch<QueryPage>> MonadicQueryAsync( | |
{ | ||
case FeedRangePartitionKey feedRangePartitionKey: | ||
{ | ||
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync( | ||
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync( | ||
this.container.LinkUri, | ||
cancellationToken); | ||
PartitionKeyDefinition partitionKeyDefinition = await this.container.GetPartitionKeyDefinitionAsync(cancellationToken); | ||
|
@@ -163,8 +244,8 @@ await this.container.GetRIDAsync(cancellationToken), | |
queryRequestOptions.PartitionKey = feedRangePartitionKey.PartitionKey; | ||
|
||
monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync( | ||
this.container.LinkUri, | ||
Documents.ResourceType.Document, | ||
this.resourceLink, | ||
this.resourceType, | ||
Documents.OperationType.Query, | ||
Guid.NewGuid(), | ||
queryRequestOptions, | ||
|
@@ -183,8 +264,8 @@ await this.container.GetRIDAsync(cancellationToken), | |
case FeedRangePartitionKeyRange feedRangePartitionKeyRange: | ||
{ | ||
monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync( | ||
this.container.LinkUri, | ||
Documents.ResourceType.Document, | ||
this.resourceLink, | ||
this.resourceType, | ||
Documents.OperationType.Query, | ||
Guid.NewGuid(), | ||
requestOptions: queryRequestOptions, | ||
|
@@ -202,7 +283,7 @@ await this.container.GetRIDAsync(cancellationToken), | |
|
||
case FeedRangeEpk feedRangeEpk: | ||
{ | ||
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync( | ||
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync( | ||
this.container.LinkUri, | ||
cancellationToken); | ||
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync( | ||
|
@@ -225,8 +306,8 @@ await this.container.GetRIDAsync(cancellationToken), | |
} | ||
|
||
monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync( | ||
this.container.LinkUri, | ||
Documents.ResourceType.Document, | ||
this.resourceLink, | ||
this.resourceType, | ||
Documents.OperationType.Query, | ||
Guid.NewGuid(), | ||
requestOptions: queryRequestOptions, | ||
|
@@ -260,7 +341,7 @@ public async Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync( | |
if (feedRange is FeedRangeEpk feedRangeEpk) | ||
{ | ||
// convert into physical range or throw a split exception | ||
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync( | ||
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync( | ||
this.container.LinkUri, | ||
cancellationToken); | ||
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync( | ||
|
@@ -285,7 +366,7 @@ await this.container.GetRIDAsync(cancellationToken), | |
feedRange = new FeedRangePartitionKeyRange(overlappingRanges[0].Id); | ||
} | ||
|
||
ResponseMessage responseMessage = await this.cosmosClientContext.ProcessResourceOperationStreamAsync( | ||
ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync( | ||
resourceUri: this.container.LinkUri, | ||
resourceType: ResourceType.Document, | ||
operationType: OperationType.ReadFeed, | ||
|
@@ -303,7 +384,7 @@ await this.container.GetRIDAsync(cancellationToken), | |
}, | ||
partitionKey: default, | ||
streamPayload: default, | ||
diagnosticsContext: default, | ||
diagnosticsContext: this.diagnosticsContext, | ||
cancellationToken: cancellationToken); | ||
|
||
TryCatch<ChangeFeedPage> monadicChangeFeedPage; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please run the MockedItemBenchmark.ReadFeed benchmark