Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal ReadFeed: Adds pagination library adoption #1947

Merged
merged 22 commits into from
Nov 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition.Parallel;
using Microsoft.Azure.Cosmos.Routing;

internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal
Expand Down
19 changes: 12 additions & 7 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Documents;

/// <summary>
Expand Down Expand Up @@ -83,24 +84,28 @@ public Task<Record> ReadItemAsync(
cancellationToken),
cancellationToken);

public Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync(
public Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please run the MockedItemBenchmark.ReadFeed benchmark

ReadFeedState readFeedState,
FeedRangeInternal feedRange,
ResourceId resourceIdentifer,
QueryRequestOptions queryRequestOptions,
int pageSize,
CancellationToken cancellationToken) => this.monadicDocumentContainer.MonadicReadFeedAsync(
readFeedState,
feedRange,
resourceIdentifer,
queryRequestOptions,
pageSize,
cancellationToken);

public Task<DocumentContainerPage> ReadFeedAsync(
public Task<ReadFeedPage> ReadFeedAsync(
ReadFeedState readFeedState,
FeedRangeInternal feedRange,
ResourceId resourceIdentifier,
QueryRequestOptions queryRequestOptions,
int pageSize,
CancellationToken cancellationToken) => TryCatch<DocumentContainerPage>.UnsafeGetResultAsync(
CancellationToken cancellationToken) => TryCatch<ReadFeedPage>.UnsafeGetResultAsync(
this.MonadicReadFeedAsync(
readFeedState,
feedRange,
resourceIdentifier,
queryRequestOptions,
pageSize,
cancellationToken),
cancellationToken);
Expand Down
22 changes: 0 additions & 22 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainerPage.cs

This file was deleted.

18 changes: 0 additions & 18 deletions Microsoft.Azure.Cosmos/src/Pagination/DocumentContainerState.cs

This file was deleted.

10 changes: 2 additions & 8 deletions Microsoft.Azure.Cosmos/src/Pagination/IDocumentContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.ChangeFeed.Pagination;
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;

internal interface IDocumentContainer : IMonadicDocumentContainer, IFeedRangeProvider, IQueryDataSource, IChangeFeedDataSource
internal interface IDocumentContainer : IMonadicDocumentContainer, IFeedRangeProvider, IQueryDataSource, IReadFeedDataSource, IChangeFeedDataSource
{
Task<Record> CreateItemAsync(
CosmosObject payload,
Expand All @@ -22,12 +22,6 @@ Task<Record> ReadItemAsync(
string identifier,
CancellationToken cancellationToken);

Task<DocumentContainerPage> ReadFeedAsync(
FeedRangeInternal feedRange,
ResourceId resourceIdentifier,
int pageSize,
CancellationToken cancellationToken);

Task SplitAsync(
FeedRangeInternal feedRange,
CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.CosmosElements;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline.CrossPartition;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;

internal interface IMonadicDocumentContainer : IMonadicFeedRangeProvider, IMonadicQueryDataSource, IMonadicChangeFeedDataSource
internal interface IMonadicDocumentContainer : IMonadicFeedRangeProvider, IMonadicQueryDataSource, IMonadicReadFeedDataSource, IMonadicChangeFeedDataSource
{
Task<TryCatch<Record>> MonadicCreateItemAsync(
CosmosObject payload,
Expand All @@ -23,12 +23,6 @@ Task<TryCatch<Record>> MonadicReadItemAsync(
string identifer,
CancellationToken cancellationToken);

Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync(
FeedRangeInternal feedRange,
ResourceId resourceIdentifer,
int pageSize,
CancellationToken cancellationToken);

Task<TryCatch> MonadicSplitAsync(
FeedRangeInternal feedRange,
CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,32 @@ namespace Microsoft.Azure.Cosmos.Pagination
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.Pipeline;
using Microsoft.Azure.Cosmos.Query.Core.QueryClient;
using Microsoft.Azure.Cosmos.ReadFeed.Pagination;
using Microsoft.Azure.Documents;

internal sealed class NetworkAttachedDocumentContainer : IMonadicDocumentContainer
{
private readonly ContainerCore container;
private readonly ContainerInternal container;
private readonly CosmosQueryClient cosmosQueryClient;
private readonly CosmosClientContext cosmosClientContext;
private readonly QueryRequestOptions queryRequestOptions;
private readonly CosmosDiagnosticsContext diagnosticsContext;
private readonly string resourceLink;
private readonly ResourceType resourceType;

public NetworkAttachedDocumentContainer(
ContainerCore container,
ContainerInternal container,
CosmosQueryClient cosmosQueryClient,
CosmosClientContext cosmosClientContext,
CosmosDiagnosticsContext diagnosticsContext,
QueryRequestOptions queryRequestOptions = null)
QueryRequestOptions queryRequestOptions = null,
string resourceLink = null,
ResourceType resourceType = ResourceType.Document)
{
this.container = container ?? throw new ArgumentNullException(nameof(container));
this.cosmosQueryClient = cosmosQueryClient ?? throw new ArgumentNullException(nameof(cosmosQueryClient));
this.cosmosClientContext = cosmosClientContext ?? throw new ArgumentNullException(nameof(cosmosClientContext));
this.diagnosticsContext = diagnosticsContext;
this.queryRequestOptions = queryRequestOptions;
this.resourceLink = resourceLink ?? this.container.LinkUri;
this.resourceType = resourceType;
}

public Task<TryCatch> MonadicSplitAsync(
Expand Down Expand Up @@ -91,7 +95,7 @@ public async Task<TryCatch<List<FeedRangeEpk>>> MonadicGetChildRangeAsync(
{
try
{
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
Expand All @@ -113,13 +117,90 @@ await this.container.GetRIDAsync(cancellationToken),
}
}

public Task<TryCatch<DocumentContainerPage>> MonadicReadFeedAsync(
public async Task<TryCatch<ReadFeedPage>> MonadicReadFeedAsync(
ReadFeedState readFeedState,
FeedRangeInternal feedRange,
ResourceId resourceIdentifer,
QueryRequestOptions queryRequestOptions,
bchong95 marked this conversation as resolved.
Show resolved Hide resolved
int pageSize,
CancellationToken cancellationToken)
{
throw new NotImplementedException();
cancellationToken.ThrowIfCancellationRequested();

if (feedRange is FeedRangeEpk feedRangeEpk)
{
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
this.container.LinkUri,
await this.container.GetRIDAsync(cancellationToken),
containerProperties.PartitionKey,
feedRange);

if ((overlappingRanges == null) || (overlappingRanges.Count != 1))
{
// Simulate a split exception, since we don't have a partition key range id to route to.
CosmosException goneException = new CosmosException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this to force the cache refresh?

Copy link
Member

@ealsur ealsur Nov 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead force a cache refresh here? Instead of throwing the exception up and hoping it gets caught and handled?

message: $"Epk Range: {feedRangeEpk.Range} is gone.",
statusCode: System.Net.HttpStatusCode.Gone,
subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone,
activityId: Guid.NewGuid().ToString(),
requestCharge: default);

return TryCatch<ReadFeedPage>.FromException(goneException);
}
}

if (queryRequestOptions != null)
{
queryRequestOptions.MaxItemCount = pageSize;
}

ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.resourceLink,
resourceType: this.resourceType,
operationType: OperationType.ReadFeed,
requestOptions: queryRequestOptions,
cosmosContainerCore: this.container,
requestEnricher: request =>
{
if (!(readFeedState.ContinuationToken is CosmosNull))
{
request.Headers.ContinuationToken = (readFeedState.ContinuationToken as CosmosString).Value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to guard against any potential nullref by checking if the continuation is a CosmosString in the upper if?

if (!(readFeedState.ContinuationToken is CosmosNull) && readFeedState.Continuation is CosmosString continuation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add polymorphism to the type.


In reply to: 516222386 [](ancestors = 516222386)

}

feedRange.Accept(FeedRangeRequestMessagePopulatorVisitor.Singleton, request);
},
partitionKey: queryRequestOptions?.PartitionKey,
streamPayload: default,
diagnosticsContext: this.diagnosticsContext,
cancellationToken: cancellationToken);

TryCatch<ReadFeedPage> monadicReadFeedPage;
if (responseMessage.StatusCode == HttpStatusCode.OK)
{
ReadFeedPage readFeedPage = new ReadFeedPage(
responseMessage.Content,
responseMessage.Headers.RequestCharge,
responseMessage.Headers.ActivityId,
responseMessage.Headers.ContinuationToken != null ? new ReadFeedState(CosmosString.Create(responseMessage.Headers.ContinuationToken)) : null);

monadicReadFeedPage = TryCatch<ReadFeedPage>.FromResult(readFeedPage);
}
else
{
CosmosException cosmosException = new CosmosException(
responseMessage.ErrorMessage,
statusCode: responseMessage.StatusCode,
(int)responseMessage.Headers.SubStatusCode,
responseMessage.Headers.ActivityId,
responseMessage.Headers.RequestCharge);
cosmosException.Headers.ContinuationToken = responseMessage.Headers.ContinuationToken;

monadicReadFeedPage = TryCatch<ReadFeedPage>.FromException(cosmosException);
}

return monadicReadFeedPage;
}

public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
Expand All @@ -135,7 +216,7 @@ public async Task<TryCatch<QueryPage>> MonadicQueryAsync(
{
case FeedRangePartitionKey feedRangePartitionKey:
{
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
PartitionKeyDefinition partitionKeyDefinition = await this.container.GetPartitionKeyDefinitionAsync(cancellationToken);
Expand Down Expand Up @@ -163,8 +244,8 @@ await this.container.GetRIDAsync(cancellationToken),
queryRequestOptions.PartitionKey = feedRangePartitionKey.PartitionKey;

monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync(
this.container.LinkUri,
Documents.ResourceType.Document,
this.resourceLink,
this.resourceType,
Documents.OperationType.Query,
Guid.NewGuid(),
queryRequestOptions,
Expand All @@ -183,8 +264,8 @@ await this.container.GetRIDAsync(cancellationToken),
case FeedRangePartitionKeyRange feedRangePartitionKeyRange:
{
monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync(
this.container.LinkUri,
Documents.ResourceType.Document,
this.resourceLink,
this.resourceType,
Documents.OperationType.Query,
Guid.NewGuid(),
requestOptions: queryRequestOptions,
Expand All @@ -202,7 +283,7 @@ await this.container.GetRIDAsync(cancellationToken),

case FeedRangeEpk feedRangeEpk:
{
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
Expand All @@ -225,8 +306,8 @@ await this.container.GetRIDAsync(cancellationToken),
}

monadicQueryPage = await this.cosmosQueryClient.ExecuteItemQueryAsync(
this.container.LinkUri,
Documents.ResourceType.Document,
this.resourceLink,
this.resourceType,
Documents.OperationType.Query,
Guid.NewGuid(),
requestOptions: queryRequestOptions,
Expand Down Expand Up @@ -260,7 +341,7 @@ public async Task<TryCatch<ChangeFeedPage>> MonadicChangeFeedAsync(
if (feedRange is FeedRangeEpk feedRangeEpk)
{
// convert into physical range or throw a split exception
ContainerProperties containerProperties = await this.cosmosClientContext.GetCachedContainerPropertiesAsync(
ContainerProperties containerProperties = await this.container.ClientContext.GetCachedContainerPropertiesAsync(
this.container.LinkUri,
cancellationToken);
List<PartitionKeyRange> overlappingRanges = await this.cosmosQueryClient.GetTargetPartitionKeyRangeByFeedRangeAsync(
Expand All @@ -285,7 +366,7 @@ await this.container.GetRIDAsync(cancellationToken),
feedRange = new FeedRangePartitionKeyRange(overlappingRanges[0].Id);
}

ResponseMessage responseMessage = await this.cosmosClientContext.ProcessResourceOperationStreamAsync(
ResponseMessage responseMessage = await this.container.ClientContext.ProcessResourceOperationStreamAsync(
resourceUri: this.container.LinkUri,
resourceType: ResourceType.Document,
operationType: OperationType.ReadFeed,
Expand All @@ -303,7 +384,7 @@ await this.container.GetRIDAsync(cancellationToken),
},
partitionKey: default,
streamPayload: default,
diagnosticsContext: default,
diagnosticsContext: this.diagnosticsContext,
cancellationToken: cancellationToken);

TryCatch<ChangeFeedPage> monadicChangeFeedPage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public static QueryIterator Create(
NetworkAttachedDocumentContainer networkAttachedDocumentContainer = new NetworkAttachedDocumentContainer(
containerCore,
client,
clientContext,
queryPipelineCreationDiagnostics,
queryRequestOptions);
DocumentContainer documentContainer = new DocumentContainer(networkAttachedDocumentContainer);
Expand Down
Loading