From 1e6051d3b3ba987184cb1ec246acff0705201eeb Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Fri, 13 Mar 2020 13:37:12 -0700 Subject: [PATCH] Preview - FeedToken support for ReadFeed (#1230) * GetTokens implementation * ChangeFeed implementation * Rename * FeedTokenIterator * GetChangeFeedStreamIterator * Bucket extension * Using bucket * FeedTokenIterator of T * Fixing bucket * Change Feed for PK * Emulator tests * Feedtoken for PKRangeId * Tests * FromString * GetPartitionKeyRanges * Preview only * Comments * Bucket tests * Comments * Removing FeedTokenIterator * Removing Bucket util * undo file * Refactor access * Initial * FeedTokenIteratorCore * Tests * Initial ReadFeed tests * New CF test * Retry on empty OK * IsDone * IsDone fix * IsDone tests * Accessors for INTERNAL * More tests * Fixes * Refactoring for ReadFeed Iterator * Possible PK * Conditional behavior * Simplifying * FeedToken on non-partitioned resources * Removing FeedTokenIteratorCore * Adding more tests * Of T * diagnostics * More tests * merge with master * Adding queryText * Note * Continuation update * Adding test * Using TryCatch * Update Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs Co-Authored-By: j82w * Comments * Nullhandling * Adding Gone passthrough and tests * Removing duplicated code * Adding more tests * OverallScope * Nullcheck * Test * comments * Removing PKRangeGoneRetryHandler * is pattern * ReadFeed with PK test * EPKString check * More tests * exception handling * Diagnostics rename * usings * merge conflicts * comments Co-authored-by: j82w --- .../src/ClientResources.Designer.cs | 18 + .../src/ClientResources.resx | 6 + Microsoft.Azure.Cosmos/src/CosmosClient.cs | 14 +- .../src/FeedIteratorCore.cs | 155 ++++++- .../src/FeedTokenEPKRange.cs | 42 +- .../src/FeedTokenInternal.cs | 7 +- .../src/FeedTokenPartitionKey.cs | 12 + .../src/FeedTokenPartitionKeyRange.cs | 23 + .../src/Handler/ClientPipelineBuilder.cs | 15 +- .../PartitionKeyRangeGoneRetryHandler.cs | 36 -- .../src/Linq/CosmosLinqQuery.cs | 1 + .../src/Resource/Conflict/ConflictsCore.cs | 13 +- .../src/Resource/Container/Container.cs | 184 ++++++++ .../Resource/Container/ContainerCore.Items.cs | 104 ++++- .../src/Resource/Container/ContainerCore.cs | 6 - .../Resource/Container/ContainerInlineCore.cs | 32 ++ .../src/Resource/Database/DatabaseCore.cs | 42 +- .../src/Resource/Offer/CosmosOffers.cs | 2 +- .../QueryResponses/ChangeFeedIteratorCore.cs | 63 ++- .../src/Resource/Scripts/ScriptsCore.cs | 36 +- .../src/Resource/User/UserCore.cs | 12 +- .../FeedToken/ChangeFeedIteratorCoreTests.cs | 31 ++ .../FeedToken/ReadFeedTokenTests.cs | 311 +++++++++++++ .../FeedToken/ChangeFeedIteratorCoreTests.cs | 135 +++++- .../FeedToken/FeedTokenTests.cs | 91 +++- .../ReadFeedTokenIteratorCoreTests.cs | 434 ++++++++++++++++++ .../ResponseMessageTests.cs | 34 ++ .../RetryHandlerTests.cs | 50 -- 28 files changed, 1672 insertions(+), 237 deletions(-) delete mode 100644 Microsoft.Azure.Cosmos/src/Handler/PartitionKeyRangeGoneRetryHandler.cs create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ReadFeedTokenTests.cs create mode 100644 Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ReadFeedTokenIteratorCoreTests.cs diff --git a/Microsoft.Azure.Cosmos/src/ClientResources.Designer.cs b/Microsoft.Azure.Cosmos/src/ClientResources.Designer.cs index 05e47c93fa..9ad89b365c 100644 --- a/Microsoft.Azure.Cosmos/src/ClientResources.Designer.cs +++ b/Microsoft.Azure.Cosmos/src/ClientResources.Designer.cs @@ -330,6 +330,24 @@ internal static string FeedToken_CannotParse { } } + /// + /// Looks up a localized string similar to Cannot define EffectivePartitionKeyRouting and FeedToken simultaneously.. + /// + internal static string FeedToken_EffectivePartitionKeyRouting { + get { + return ResourceManager.GetString("FeedToken_EffectivePartitionKeyRouting", resourceCulture); + } + } + + /// + /// Looks up a localized string similar to Expected FeedTokenInternal instance.. + /// + internal static string FeedToken_InvalidImplementation { + get { + return ResourceManager.GetString("FeedToken_InvalidImplementation", resourceCulture); + } + } + /// /// Looks up a localized string similar to Provide a value greater than zero.. /// diff --git a/Microsoft.Azure.Cosmos/src/ClientResources.resx b/Microsoft.Azure.Cosmos/src/ClientResources.resx index bcee9c3974..564b498e2f 100644 --- a/Microsoft.Azure.Cosmos/src/ClientResources.resx +++ b/Microsoft.Azure.Cosmos/src/ClientResources.resx @@ -318,4 +318,10 @@ Cannot parse '{0}' as a valid FeedToken. + + Cannot define EffectivePartitionKeyRouting and FeedToken simultaneously. + + + Expected FeedTokenInternal instance. + \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/CosmosClient.cs b/Microsoft.Azure.Cosmos/src/CosmosClient.cs index 28bcc794a5..176e9aef96 100644 --- a/Microsoft.Azure.Cosmos/src/CosmosClient.cs +++ b/Microsoft.Azure.Cosmos/src/CosmosClient.cs @@ -864,13 +864,13 @@ private FeedIteratorInternal GetDatabaseQueryStreamIteratorHelper( QueryRequestOptions requestOptions = null) { this.ThrowIfDisposed(); - return new FeedIteratorCore( - this.ClientContext, - this.DatabaseRootUri, - ResourceType.Database, - queryDefinition, - continuationToken, - requestOptions); + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.ClientContext, + resourceLink: this.DatabaseRootUri, + resourceType: ResourceType.Database, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } /// diff --git a/Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs index ed6b43a49e..ba5491a698 100644 --- a/Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/FeedIteratorCore.cs @@ -12,6 +12,8 @@ namespace Microsoft.Azure.Cosmos using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; using Microsoft.Azure.Cosmos.Query.Core; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.Azure.Documents; using static Microsoft.Azure.Documents.RuntimeConstants; @@ -20,25 +22,75 @@ namespace Microsoft.Azure.Cosmos /// internal sealed class FeedIteratorCore : FeedIteratorInternal { + private readonly ContainerCore containerCore; private readonly CosmosClientContext clientContext; private readonly Uri resourceLink; private readonly ResourceType resourceType; private readonly SqlQuerySpec querySpec; private bool hasMoreResultsInternal; + private FeedTokenInternal feedTokenInternal; - public FeedIteratorCore( + internal static FeedIteratorCore CreateForNonPartitionedResource( CosmosClientContext clientContext, Uri resourceLink, ResourceType resourceType, QueryDefinition queryDefinition, string continuationToken, QueryRequestOptions options) + { + return new FeedIteratorCore( + clientContext: clientContext, + containerCore: null, + resourceLink: resourceLink, + resourceType: resourceType, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + feedTokenInternal: null, + options: options); + } + + internal static FeedIteratorCore CreateForPartitionedResource( + ContainerCore containerCore, + Uri resourceLink, + ResourceType resourceType, + QueryDefinition queryDefinition, + string continuationToken, + FeedTokenInternal feedTokenInternal, + QueryRequestOptions options) + { + if (containerCore == null) + { + throw new ArgumentNullException(nameof(containerCore)); + } + + return new FeedIteratorCore( + containerCore: containerCore, + clientContext: containerCore.ClientContext, + resourceLink: resourceLink, + resourceType: resourceType, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + feedTokenInternal: feedTokenInternal, + options: options); + } + + private FeedIteratorCore( + ContainerCore containerCore, + CosmosClientContext clientContext, + Uri resourceLink, + ResourceType resourceType, + QueryDefinition queryDefinition, + string continuationToken, + FeedTokenInternal feedTokenInternal, + QueryRequestOptions options) { this.resourceLink = resourceLink; + this.containerCore = containerCore; this.clientContext = clientContext; this.resourceType = resourceType; this.querySpec = queryDefinition?.ToSqlQuerySpec(); - this.ContinuationToken = continuationToken; + this.feedTokenInternal = feedTokenInternal; + this.ContinuationToken = continuationToken ?? this.feedTokenInternal?.GetContinuation(); this.requestOptions = options; this.hasMoreResultsInternal = true; } @@ -50,7 +102,7 @@ public override #else internal #endif - FeedToken FeedToken => throw new NotImplementedException(); + FeedToken FeedToken => this.feedTokenInternal; /// /// The query options for the result set @@ -67,7 +119,18 @@ public override /// /// (Optional) representing request cancellation. /// A query response from cosmos service - public override async Task ReadNextAsync(CancellationToken cancellationToken = default) + public override Task ReadNextAsync(CancellationToken cancellationToken = default) + { + CosmosDiagnosticsContext diagnostics = CosmosDiagnosticsContext.Create(this.requestOptions); + using (diagnostics.CreateOverallScope("FeedReadNextAsync")) + { + return this.ReadNextInternalAsync(diagnostics, cancellationToken); + } + } + + private async Task ReadNextInternalAsync( + CosmosDiagnosticsContext diagnostics, + CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); @@ -75,10 +138,29 @@ public override async Task ReadNextAsync(CancellationToken canc OperationType operation = OperationType.ReadFeed; if (this.querySpec != null) { - stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType); + stream = this.clientContext.SerializerCore.ToStreamSqlQuerySpec(this.querySpec, this.resourceType); operation = OperationType.Query; } + if (this.feedTokenInternal == null) + { + TryCatch tryCatchFeedTokeninternal = await this.TryInitializeFeedTokenAsync(cancellationToken); + if (!tryCatchFeedTokeninternal.Succeeded) + { + if (tryCatchFeedTokeninternal.Exception.InnerException is CosmosException cosmosException) + { + return cosmosException.ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnostics)); + } + + return CosmosExceptionFactory.CreateInternalServerErrorException( + message: tryCatchFeedTokeninternal.Exception.InnerException.Message, + innerException: tryCatchFeedTokeninternal.Exception.InnerException, + diagnosticsContext: diagnostics).ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnostics)); + } + + this.feedTokenInternal = tryCatchFeedTokeninternal.Result; + } + ResponseMessage response = await this.clientContext.ProcessResourceOperationStreamAsync( resourceUri: this.resourceLink, resourceType: this.resourceType, @@ -95,26 +177,63 @@ public override async Task ReadNextAsync(CancellationToken canc request.Headers.Add(HttpConstants.HttpHeaders.ContentType, MediaTypes.QueryJson); request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString); } + + this.feedTokenInternal?.EnrichRequest(request); }, - diagnosticsScope: null, + diagnosticsScope: diagnostics, cancellationToken: cancellationToken); - this.ContinuationToken = response.Headers.ContinuationToken; - this.hasMoreResultsInternal = GetHasMoreResults(this.ContinuationToken, response.StatusCode); + // Retry in case of splits or other scenarios only on partitioned resources + if (this.containerCore != null + && await this.feedTokenInternal.ShouldRetryAsync(this.containerCore, response, cancellationToken)) + { + return await this.ReadNextInternalAsync(diagnostics, cancellationToken); + } + + if (response.IsSuccessStatusCode) + { + this.feedTokenInternal.UpdateContinuation(response.Headers.ContinuationToken); + this.ContinuationToken = this.feedTokenInternal.GetContinuation(); + this.hasMoreResultsInternal = !this.feedTokenInternal.IsDone; + } + else + { + this.hasMoreResultsInternal = false; + } + return response; } - internal static string GetContinuationToken(ResponseMessage httpResponseMessage) + private async Task> TryInitializeFeedTokenAsync(CancellationToken cancellationToken) { - return httpResponseMessage.Headers.ContinuationToken; - } + string containerRId = string.Empty; + if (this.containerCore != null) + { + try + { + containerRId = await this.containerCore.GetRIDAsync(cancellationToken); + } + catch (Exception cosmosException) + { + return TryCatch.FromException(cosmosException); + } + } - internal static bool GetHasMoreResults(string continuationToken, HttpStatusCode statusCode) - { - // this logic might not be sufficient composite continuation token https://msdata.visualstudio.com/CosmosDB/SDK/_workitems/edit/269099 - // in the case where this is a result set iterator for a change feed, not modified indicates that - // the enumeration is done for now. - return continuationToken != null && statusCode != HttpStatusCode.NotModified; + // Create FeedToken for the full Range + FeedTokenEPKRange feedTokenInternal = new FeedTokenEPKRange( + containerRId, + new PartitionKeyRange() + { + MinInclusive = Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, + MaxExclusive = Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey + }); + // Initialize with the ContinuationToken that the user passed, if any + if (this.ContinuationToken != null) + { + feedTokenInternal.UpdateContinuation(this.ContinuationToken); + } + + return TryCatch.FromResult(feedTokenInternal); } public override CosmosElement GetCosmsoElementContinuationToken() @@ -164,4 +283,4 @@ public override async Task> ReadNextAsync(CancellationToken canc return this.responseCreator(response); } } -} \ No newline at end of file +} diff --git a/Microsoft.Azure.Cosmos/src/FeedTokenEPKRange.cs b/Microsoft.Azure.Cosmos/src/FeedTokenEPKRange.cs index fb8dcd837e..9dc465c715 100644 --- a/Microsoft.Azure.Cosmos/src/FeedTokenEPKRange.cs +++ b/Microsoft.Azure.Cosmos/src/FeedTokenEPKRange.cs @@ -18,14 +18,16 @@ internal sealed class FeedTokenEPKRange : FeedTokenInternal { internal readonly Queue CompositeContinuationTokens; internal readonly Documents.Routing.Range CompleteRange; + private readonly HashSet doneRanges; private CompositeContinuationToken currentToken; - private string initialNotModifiedRange; + private string initialNoResultsRange; private FeedTokenEPKRange( string containerRid) : base(containerRid) { this.CompositeContinuationTokens = new Queue(); + this.doneRanges = new HashSet(); } private FeedTokenEPKRange( @@ -122,8 +124,12 @@ public override void EnrichRequest(RequestMessage request) throw new ArgumentNullException(nameof(request)); } - request.Properties[HandlerConstants.StartEpkString] = this.currentToken.Range.Min; - request.Properties[HandlerConstants.EndEpkString] = this.currentToken.Range.Max; + // in case EPK has already been set + if (!request.Properties.ContainsKey(HandlerConstants.StartEpkString)) + { + request.Properties[HandlerConstants.StartEpkString] = this.currentToken.Range.Min; + request.Properties[HandlerConstants.EndEpkString] = this.currentToken.Range.Max; + } } public override string GetContinuation() => this.currentToken.Token; @@ -135,10 +141,23 @@ public override string ToString() public override void UpdateContinuation(string continuationToken) { + if (continuationToken == null) + { + // Queries and normal ReadFeed can signal termination by CT null, not NotModified + // Change Feed never lands here, as it always provides a CT + // Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done + this.doneRanges.Add(this.currentToken.Range.Min); + } + this.currentToken.Token = continuationToken; this.MoveToNextToken(); } + /// + /// The concept of Done is only for Query and ReadFeed. Change Feed is never done, it is an infinite stream. + /// + public override bool IsDone => this.doneRanges.Count == this.CompositeContinuationTokens.Count; + public override async Task ShouldRetryAsync( ContainerCore containerCore, ResponseMessage responseMessage, @@ -146,22 +165,24 @@ public override async Task ShouldRetryAsync( { if (responseMessage.IsSuccessStatusCode) { - this.initialNotModifiedRange = null; + this.initialNoResultsRange = null; return false; } + // If the current response is NotModified (ChangeFeed), try and skip to a next one if (responseMessage.StatusCode == HttpStatusCode.NotModified && this.CompositeContinuationTokens.Count > 1) { - if (this.initialNotModifiedRange == null) + if (this.initialNoResultsRange == null) { - this.initialNotModifiedRange = this.currentToken.Range.Min; + this.initialNoResultsRange = this.currentToken.Range.Min; return true; } - return !this.initialNotModifiedRange.Equals(this.currentToken.Range.Min, StringComparison.OrdinalIgnoreCase); + return !this.initialNoResultsRange.Equals(this.currentToken.Range.Min, StringComparison.OrdinalIgnoreCase); } + // Split handling bool partitionSplit = responseMessage.StatusCode == HttpStatusCode.Gone && (responseMessage.Headers.SubStatusCode == Documents.SubStatusCodes.PartitionKeyRangeGone || responseMessage.Headers.SubStatusCode == Documents.SubStatusCodes.CompletingSplit); if (partitionSplit) @@ -220,6 +241,13 @@ private void MoveToNextToken() CompositeContinuationToken recentToken = this.CompositeContinuationTokens.Dequeue(); this.CompositeContinuationTokens.Enqueue(recentToken); this.currentToken = this.CompositeContinuationTokens.Peek(); + + // In a Query / ReadFeed not Change Feed, skip ranges that are done to avoid requests + while (!this.IsDone && + this.doneRanges.Contains(this.currentToken.Range.Min)) + { + this.MoveToNextToken(); + } } private void HandleSplit(IReadOnlyList keyRanges) diff --git a/Microsoft.Azure.Cosmos/src/FeedTokenInternal.cs b/Microsoft.Azure.Cosmos/src/FeedTokenInternal.cs index 95ae6aea9c..335e1d9b76 100644 --- a/Microsoft.Azure.Cosmos/src/FeedTokenInternal.cs +++ b/Microsoft.Azure.Cosmos/src/FeedTokenInternal.cs @@ -19,11 +19,6 @@ public FeedTokenInternal() public FeedTokenInternal(string containerRid) { - if (string.IsNullOrEmpty(containerRid)) - { - throw new ArgumentNullException(nameof(containerRid)); - } - this.ContainerRid = containerRid; } @@ -33,6 +28,8 @@ public FeedTokenInternal(string containerRid) public abstract void UpdateContinuation(string continuationToken); + public abstract bool IsDone { get; } + public static bool TryParse( string toStringValue, out FeedToken parsedToken) diff --git a/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKey.cs b/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKey.cs index 7ac725892c..c06c6764dc 100644 --- a/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKey.cs +++ b/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKey.cs @@ -12,6 +12,7 @@ internal sealed class FeedTokenPartitionKey : FeedTokenInternal { internal readonly PartitionKey PartitionKey; private string continuationToken; + private bool isDone = false; public FeedTokenPartitionKey(PartitionKey partitionKey) { @@ -30,6 +31,8 @@ public override void EnrichRequest(RequestMessage request) public override string GetContinuation() => this.continuationToken; + public override bool IsDone => this.isDone; + public override string ToString() { return JsonConvert.SerializeObject(this); @@ -37,6 +40,15 @@ public override string ToString() public override void UpdateContinuation(string continuationToken) { + if (continuationToken == null) + { + // Queries and normal ReadFeed can signal termination by CT null, not NotModified + // Change Feed never lands here, as it always provides a CT + + // Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done + this.isDone = true; + } + this.continuationToken = continuationToken; } diff --git a/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKeyRange.cs b/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKeyRange.cs index e7b3dc4b72..bda34bfa12 100644 --- a/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKeyRange.cs +++ b/Microsoft.Azure.Cosmos/src/FeedTokenPartitionKeyRange.cs @@ -25,6 +25,7 @@ internal sealed class FeedTokenPartitionKeyRange : FeedTokenInternal internal readonly string PartitionKeyRangeId; internal FeedTokenEPKRange FeedTokenEPKRange; // If the initial token splits, it will use this token; private string continuationToken; + private bool isDone; public FeedTokenPartitionKeyRange(string partitionKeyRangeId) { @@ -77,6 +78,15 @@ public override void UpdateContinuation(string continuationToken) { if (this.FeedTokenEPKRange == null) { + if (continuationToken == null) + { + // Queries and normal ReadFeed can signal termination by CT null, not NotModified + // Change Feed never lands here, as it always provides a CT + + // Consider current range done, if this FeedToken contains multiple ranges due to splits, all of them need to be considered done + this.isDone = true; + } + this.continuationToken = continuationToken; } else @@ -85,6 +95,19 @@ public override void UpdateContinuation(string continuationToken) } } + public override bool IsDone + { + get + { + if (this.FeedTokenEPKRange == null) + { + return this.isDone; + } + + return this.FeedTokenEPKRange.IsDone; + } + } + public static bool TryParseInstance(string toStringValue, out FeedToken feedToken) { try diff --git a/Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs b/Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs index b7937863f4..21b512c329 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/ClientPipelineBuilder.cs @@ -15,7 +15,6 @@ internal class ClientPipelineBuilder private readonly CosmosClient client; private readonly RequestHandler invalidPartitionExceptionRetryHandler; private readonly RequestHandler transportHandler; - private readonly RequestHandler partitionKeyRangeGoneRetryHandler; private IReadOnlyCollection customHandlers; private RequestHandler retryHandler; @@ -27,9 +26,6 @@ public ClientPipelineBuilder( this.transportHandler = new TransportHandler(client); Debug.Assert(this.transportHandler.InnerHandler == null, nameof(this.transportHandler)); - this.partitionKeyRangeGoneRetryHandler = new PartitionKeyRangeGoneRetryHandler(this.client); - Debug.Assert(this.partitionKeyRangeGoneRetryHandler.InnerHandler == null, "The partitionKeyRangeGoneRetryHandler.InnerHandler must be null to allow other handlers to be linked."); - this.invalidPartitionExceptionRetryHandler = new NamedCacheRetryHandler(); Debug.Assert(this.invalidPartitionExceptionRetryHandler.InnerHandler == null, "The invalidPartitionExceptionRetryHandler.InnerHandler must be null to allow other handlers to be linked."); @@ -101,14 +97,6 @@ private set /// | /// +---------------------------------------+ /// | | - /// | partitionKeyRangeGoneRetryHandler | - /// | | - /// +---------------------------------------+ - /// | - /// | - /// | - /// +---------------------------------------+ - /// | | /// | PartitionKeyRangeHandler | /// | | /// +---------------------------------------+ @@ -155,7 +143,7 @@ public RequestInvokerHandler Build() return root; } - private static RequestHandler CreatePipeline(params RequestHandler[] requestHandlers) + internal static RequestHandler CreatePipeline(params RequestHandler[] requestHandlers) { RequestHandler head = null; int handlerCount = requestHandlers.Length; @@ -195,7 +183,6 @@ private RequestHandler CreateDocumentFeedPipeline() RequestHandler[] feedPipeline = new RequestHandler[] { this.invalidPartitionExceptionRetryHandler, - this.partitionKeyRangeGoneRetryHandler, this.PartitionKeyRangeHandler, this.transportHandler, }; diff --git a/Microsoft.Azure.Cosmos/src/Handler/PartitionKeyRangeGoneRetryHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/PartitionKeyRangeGoneRetryHandler.cs deleted file mode 100644 index 76a886b9e6..0000000000 --- a/Microsoft.Azure.Cosmos/src/Handler/PartitionKeyRangeGoneRetryHandler.cs +++ /dev/null @@ -1,36 +0,0 @@ -//------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -//------------------------------------------------------------ - -namespace Microsoft.Azure.Cosmos.Handlers -{ - using System; - using System.Threading.Tasks; - using Microsoft.Azure.Documents; - - /// - /// Handler to ensure that CollectionCache and PartitionRoutingMap for a given collection exists - /// - internal class PartitionKeyRangeGoneRetryHandler : AbstractRetryHandler - { - private readonly CosmosClient client; - - public PartitionKeyRangeGoneRetryHandler(CosmosClient client) - { - if (client == null) - { - throw new ArgumentNullException(nameof(client)); - } - this.client = client; - } - - internal override async Task GetRetryPolicyAsync(RequestMessage request) - { - return new PartitionKeyRangeGoneRetryPolicy( - await this.client.DocumentClient.GetCollectionCacheAsync(), - await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(), - PathsHelper.GetCollectionPath(request.RequestUri.ToString()), - null); - } - } -} diff --git a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs index 625d523ca8..940dad6db6 100644 --- a/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs +++ b/Microsoft.Azure.Cosmos/src/Linq/CosmosLinqQuery.cs @@ -219,6 +219,7 @@ private FeedIteratorInternal CreateStreamIterator(bool isContinuationExcpected) sqlQuerySpec: querySpec, isContinuationExcpected: isContinuationExcpected, continuationToken: this.continuationToken, + feedToken: null, requestOptions: this.cosmosQueryRequestOptions); } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Conflict/ConflictsCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Conflict/ConflictsCore.cs index 93f92d4169..ae82c3efa5 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Conflict/ConflictsCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Conflict/ConflictsCore.cs @@ -101,13 +101,14 @@ public override FeedIterator GetConflictQueryStreamIterator( string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.clientContext, + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.clientContext, this.container.LinkUri, - ResourceType.Conflict, - queryDefinition, - continuationToken, - requestOptions); + resourceType: ResourceType.Conflict, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); + } public override FeedIterator GetConflictQueryIterator( diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs index 6ab36d622f..d190ac77cc 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/Container.cs @@ -1318,6 +1318,190 @@ public abstract FeedIterator GetChangeFeedIterator( public abstract Task> GetPartitionKeyRangesAsync( FeedToken feedToken, CancellationToken cancellationToken = default(CancellationToken)); + + /// + /// This method creates a query for items under a container in an Azure Cosmos database using a SQL statement with parameterized values. It returns a FeedIterator. + /// For more information on preparing SQL statements with parameterized values, please see . + /// + /// A FeedToken obtained from or from a previous FeedTokenIterator + /// The Cosmos SQL query definition. + /// (Optional) The options for the item query request. + /// An iterator to go through the items. + /// + /// Query as a stream only supports single partition queries + /// + /// + /// Create a query to get all the ToDoActivity that have a cost greater than 9000 for the specified partition + /// + /// feedTokens = await this.Container.GetFeedTokensAsync(); + /// // Distribute feedTokens across multiple compute units and pass each one to a different iterator + /// QueryDefinition queryDefinition = new QueryDefinition("select * from ToDos t where t.cost > @expensive") + /// .WithParameter("@expensive", 9000); + /// FeedIterator feedIterator = this.Container.GetItemQueryStreamIterator( + /// queryDefinition, + /// feedTokens[0], + /// new QueryRequestOptions() { }); + /// + /// while (feedIterator.HasMoreResults) + /// { + /// using (ResponseMessage response = await feedIterator.ReadNextAsync()) + /// { + /// using (StreamReader sr = new StreamReader(response.Content)) + /// using (JsonTextReader jtr = new JsonTextReader(sr)) + /// { + /// JObject result = JObject.Load(jtr); + /// } + /// } + /// } + /// ]]> + /// + /// + public abstract FeedIterator GetItemQueryStreamIterator( + FeedToken feedToken, + QueryDefinition queryDefinition, + QueryRequestOptions requestOptions = null); + + /// + /// This method creates a query for items under a container in an Azure Cosmos database using a SQL statement with parameterized values. It returns a FeedIterator. + /// For more information on preparing SQL statements with parameterized values, please see . + /// + /// A FeedToken obtained from or from a previous FeedTokenIterator + /// The Cosmos SQL query definition. + /// (Optional) The options for the item query request. + /// An iterator to go through the items. + /// + /// Query as a stream only supports single partition queries + /// + /// + /// Create a query to get all the ToDoActivity that have a cost greater than 9000 for the specified partition + /// + /// feedTokens = await this.Container.GetFeedTokensAsync(); + /// // Distribute feedTokens across multiple compute units and pass each one to a different iterator + /// QueryDefinition queryDefinition = new QueryDefinition("select * from ToDos t where t.cost > @expensive") + /// .WithParameter("@expensive", 9000); + /// FeedIterator feedIterator = this.Container.GetItemQueryIterator( + /// feedTokens[0], + /// queryDefinition, + /// new QueryRequestOptions() { }); + /// + /// while (feedIterator.HasMoreResults) + /// { + /// foreach(var item in await feedIterator.ReadNextAsync()){ + /// { + /// Console.WriteLine(item.cost); + /// } + /// } + /// ]]> + /// + /// + public abstract FeedIterator GetItemQueryIterator( + FeedToken feedToken, + QueryDefinition queryDefinition, + QueryRequestOptions requestOptions = null); + + /// + /// This method creates a query for items under a container in an Azure Cosmos database using a SQL statement with parameterized values. It returns a FeedIterator. + /// For more information on preparing SQL statements with parameterized values, please see . + /// + /// A FeedToken obtained from or from a previous FeedTokenIterator + /// The Cosmos SQL query text. + /// (Optional) The options for the item query request. + /// An iterator to go through the items. + /// + /// Query as a stream only supports single partition queries + /// + /// + /// Create a query to get all the ToDoActivity that have a cost greater than 9000 for the specified partition + /// + /// feedTokens = await this.Container.GetFeedTokensAsync(); + /// // Distribute feedTokens across multiple compute units and pass each one to a different iterator + /// QueryDefinition queryDefinition = new QueryDefinition("select * from ToDos t where t.cost > @expensive") + /// .WithParameter("@expensive", 9000); + /// FeedIterator feedIterator = this.Container.GetItemQueryStreamIterator( + /// queryDefinition, + /// feedTokens[0], + /// new QueryRequestOptions() { }); + /// + /// while (feedIterator.HasMoreResults) + /// { + /// using (ResponseMessage response = await feedIterator.ReadNextAsync()) + /// { + /// using (StreamReader sr = new StreamReader(response.Content)) + /// using (JsonTextReader jtr = new JsonTextReader(sr)) + /// { + /// JObject result = JObject.Load(jtr); + /// } + /// } + /// } + /// ]]> + /// + /// + public abstract FeedIterator GetItemQueryStreamIterator( + FeedToken feedToken, + string queryText = null, + QueryRequestOptions requestOptions = null); + + /// + /// This method creates a query for items under a container in an Azure Cosmos database using a SQL statement with parameterized values. It returns a FeedIterator. + /// For more information on preparing SQL statements with parameterized values, please see . + /// + /// A FeedToken obtained from or from a previous FeedTokenIterator + /// The Cosmos SQL query text. + /// (Optional) The options for the item query request. + /// An iterator to go through the items. + /// + /// Query as a stream only supports single partition queries + /// + /// + /// Create a query to get all the ToDoActivity that have a cost greater than 9000 for the specified partition + /// + /// feedTokens = await this.Container.GetFeedTokensAsync(); + /// // Distribute feedTokens across multiple compute units and pass each one to a different iterator + /// QueryDefinition queryDefinition = new QueryDefinition("select * from ToDos t where t.cost > @expensive") + /// .WithParameter("@expensive", 9000); + /// FeedIterator feedIterator = this.Container.GetItemQueryIterator( + /// feedTokens[0], + /// queryDefinition, + /// new QueryRequestOptions() { }); + /// + /// while (feedIterator.HasMoreResults) + /// { + /// foreach(var item in await feedIterator.ReadNextAsync()){ + /// { + /// Console.WriteLine(item.cost); + /// } + /// } + /// ]]> + /// + /// + public abstract FeedIterator GetItemQueryIterator( + FeedToken feedToken, + string queryText = null, + QueryRequestOptions requestOptions = null); #endif } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs index 67bd64c672..2dfe3fb015 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.Items.cs @@ -247,6 +247,7 @@ public override FeedIterator GetItemQueryStreamIterator( sqlQuerySpec: queryDefinition?.ToSqlQuerySpec(), isContinuationExcpected: true, continuationToken: continuationToken, + feedToken: null, requestOptions: requestOptions); } @@ -390,6 +391,98 @@ public override IOrderedQueryable GetItemLinqQueryable( this.ClientContext.ClientOptions.SerializerOptions); } +#if PREVIEW + public override +#else + internal +#endif + FeedIterator GetItemQueryIterator( + FeedToken feedToken, + QueryDefinition queryDefinition, + QueryRequestOptions requestOptions = null) + { + requestOptions = requestOptions ?? new QueryRequestOptions(); + + if (!(this.GetItemQueryStreamIterator( + feedToken, + queryDefinition, + requestOptions) is FeedIteratorInternal feedIterator)) + { + throw new InvalidOperationException($"Expected a FeedIteratorInternal."); + } + + return new FeedIteratorCore( + feedIterator: feedIterator, + responseCreator: this.ClientContext.ResponseFactory.CreateQueryFeedUserTypeResponse); + } + +#if PREVIEW + public override +#else + internal +#endif + FeedIterator GetItemQueryStreamIterator( + FeedToken feedToken, + QueryDefinition queryDefinition, + QueryRequestOptions requestOptions = null) + { + if (feedToken is FeedTokenInternal feedTokenInternal) + { + return this.GetItemQueryStreamIteratorInternal( + sqlQuerySpec: queryDefinition?.ToSqlQuerySpec(), + isContinuationExcpected: true, + continuationToken: null, + feedToken: feedTokenInternal, + requestOptions: requestOptions); + } + + throw new ArgumentException(nameof(feedToken), ClientResources.FeedToken_InvalidImplementation); + } + +#if PREVIEW + public override +#else + internal +#endif + FeedIterator GetItemQueryIterator( + FeedToken feedToken, + string queryText = null, + QueryRequestOptions requestOptions = null) + { + QueryDefinition queryDefinition = null; + if (queryText != null) + { + queryDefinition = new QueryDefinition(queryText); + } + + return this.GetItemQueryIterator( + feedToken, + queryDefinition, + requestOptions); + } + +#if PREVIEW + public override +#else + internal +#endif + FeedIterator GetItemQueryStreamIterator( + FeedToken feedToken, + string queryText = null, + QueryRequestOptions requestOptions = null) + { + QueryDefinition queryDefinition = null; + if (queryText != null) + { + queryDefinition = new QueryDefinition(queryText); + } + + return this.GetItemQueryStreamIterator( + feedToken, + queryDefinition, + requestOptions); + } + public override ChangeFeedProcessorBuilder GetChangeFeedProcessorBuilder( string processorName, ChangesHandler onChangesDelegate) @@ -482,23 +575,30 @@ internal FeedIteratorInternal GetItemQueryStreamIteratorInternal( SqlQuerySpec sqlQuerySpec, bool isContinuationExcpected, string continuationToken, + FeedTokenInternal feedToken, QueryRequestOptions requestOptions) { requestOptions = requestOptions ?? new QueryRequestOptions(); if (requestOptions.IsEffectivePartitionKeyRouting) { + if (feedToken != null) + { + throw new ArgumentException(nameof(feedToken), ClientResources.FeedToken_EffectivePartitionKeyRouting); + } + requestOptions.PartitionKey = null; } if (sqlQuerySpec == null) { - return new FeedIteratorCore( - this.ClientContext, + return FeedIteratorCore.CreateForPartitionedResource( + this, this.LinkUri, resourceType: ResourceType.Document, queryDefinition: null, continuationToken: continuationToken, + feedTokenInternal: feedToken, options: requestOptions); } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs index b64ebd395a..865a9cfedd 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerCore.cs @@ -235,7 +235,6 @@ public override FeedIterator GetChangeFeedStreamIterator(ChangeFeedRequestOptions changeFeedRequestOptions = null) { return new ChangeFeedIteratorCore( - this.ClientContext, this, changeFeedRequestOptions); } @@ -251,7 +250,6 @@ FeedIterator GetChangeFeedStreamIterator( { FeedTokenInternal feedTokenInternal = feedToken as FeedTokenInternal; return new ChangeFeedIteratorCore( - this.ClientContext, this, feedTokenInternal, changeFeedRequestOptions); @@ -267,7 +265,6 @@ FeedIterator GetChangeFeedStreamIterator( ChangeFeedRequestOptions changeFeedRequestOptions = null) { return new ChangeFeedIteratorCore( - this.ClientContext, this, new FeedTokenPartitionKey(partitionKey), changeFeedRequestOptions); @@ -281,7 +278,6 @@ public override FeedIterator GetChangeFeedIterator(ChangeFeedRequestOptions changeFeedRequestOptions = null) { ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( - this.ClientContext, this, changeFeedRequestOptions); @@ -299,7 +295,6 @@ FeedIterator GetChangeFeedIterator( { FeedTokenInternal feedTokenInternal = feedToken as FeedTokenInternal; ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( - this.ClientContext, this, feedTokenInternal, changeFeedRequestOptions); @@ -317,7 +312,6 @@ FeedIterator GetChangeFeedIterator( ChangeFeedRequestOptions changeFeedRequestOptions = null) { ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore( - this.ClientContext, this, new FeedTokenPartitionKey(partitionKey), changeFeedRequestOptions); diff --git a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs index 52c0f52c31..f4fedb7cd8 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Container/ContainerInlineCore.cs @@ -312,6 +312,38 @@ public override Task> GetPartitionKeyRangesAsync( { return TaskHelper.RunInlineIfNeededAsync(() => this.container.GetPartitionKeyRangesAsync(feedToken, cancellationToken)); } + + public override FeedIterator GetItemQueryStreamIterator( + FeedToken feedToken, + string queryText = null, + QueryRequestOptions requestOptions = null) + { + return this.container.GetItemQueryStreamIterator(feedToken, queryText, requestOptions); + } + + public override FeedIterator GetItemQueryIterator( + FeedToken feedToken, + string queryText = null, + QueryRequestOptions requestOptions = null) + { + return this.container.GetItemQueryIterator(feedToken, queryText, requestOptions); + } + + public override FeedIterator GetItemQueryStreamIterator( + FeedToken feedToken, + QueryDefinition queryDefinition, + QueryRequestOptions requestOptions = null) + { + return this.container.GetItemQueryStreamIterator(feedToken, queryDefinition, requestOptions); + } + + public override FeedIterator GetItemQueryIterator( + FeedToken feedToken, + QueryDefinition queryDefinition, + QueryRequestOptions requestOptions = null) + { + return this.container.GetItemQueryIterator(feedToken, queryDefinition, requestOptions); + } #endif public static implicit operator ContainerCore(ContainerInlineCore containerInlineCore) => containerInlineCore.container; } diff --git a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs index 8183ba0d01..899206fb10 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Database/DatabaseCore.cs @@ -416,13 +416,13 @@ public override FeedIterator GetContainerQueryStreamIterator( string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.ClientContext, - this.LinkUri, - ResourceType.Collection, - queryDefinition, - continuationToken, - requestOptions); + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.ClientContext, + resourceLink: this.LinkUri, + resourceType: ResourceType.Collection, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } public override FeedIterator GetContainerQueryIterator( @@ -470,13 +470,13 @@ public FeedIterator GetUserQueryStreamIterator(QueryDefinition queryDefinition, string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.ClientContext, - this.LinkUri, - ResourceType.User, - queryDefinition, - continuationToken, - requestOptions); + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.ClientContext, + resourceLink: this.LinkUri, + resourceType: ResourceType.User, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } public override FeedIterator GetUserQueryIterator(string queryText = null, @@ -603,13 +603,13 @@ internal FeedIterator GetDataEncryptionKeyStreamIterator( requestOptions.EnumerationDirection = isDescending ? EnumerationDirection.Reverse : EnumerationDirection.Forward; } - return new FeedIteratorCore( - clientContext: this.ClientContext, - resourceLink: this.LinkUri, - resourceType: ResourceType.ClientEncryptionKey, - queryDefinition: null, - continuationToken: continuationToken, - options: requestOptions); + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.ClientContext, + resourceLink: this.LinkUri, + resourceType: ResourceType.ClientEncryptionKey, + queryDefinition: null, + continuationToken: continuationToken, + options: requestOptions); } #if PREVIEW diff --git a/Microsoft.Azure.Cosmos/src/Resource/Offer/CosmosOffers.cs b/Microsoft.Azure.Cosmos/src/Resource/Offer/CosmosOffers.cs index 58310ab1c1..753cfc1336 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Offer/CosmosOffers.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Offer/CosmosOffers.cs @@ -181,7 +181,7 @@ internal virtual FeedIterator GetOfferQueryStreamIterator( QueryRequestOptions requestOptions = null, CancellationToken cancellationToken = default(CancellationToken)) { - return new FeedIteratorCore( + return FeedIteratorCore.CreateForNonPartitionedResource( clientContext: this.ClientContext, resourceLink: this.OfferRootUri, resourceType: ResourceType.Offer, diff --git a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs b/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs index 9cf4722ce5..881e9bae02 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/QueryResponses/ChangeFeedIteratorCore.cs @@ -10,6 +10,8 @@ namespace Microsoft.Azure.Cosmos using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.CosmosElements; + using Microsoft.Azure.Cosmos.Query.Core.Monads; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; /// /// Cosmos Change Feed iterator using FeedToken @@ -23,22 +25,19 @@ internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal private bool hasMoreResults = true; internal ChangeFeedIteratorCore( - CosmosClientContext clientContext, ContainerCore container, FeedTokenInternal feedTokenInternal, ChangeFeedRequestOptions changeFeedRequestOptions) - : this(clientContext, container, changeFeedRequestOptions) + : this(container, changeFeedRequestOptions) { if (feedTokenInternal == null) throw new ArgumentNullException(nameof(feedTokenInternal)); this.feedTokenInternal = feedTokenInternal; } internal ChangeFeedIteratorCore( - CosmosClientContext clientContext, ContainerCore container, ChangeFeedRequestOptions changeFeedRequestOptions) { - if (clientContext == null) throw new ArgumentNullException(nameof(clientContext)); if (container == null) throw new ArgumentNullException(nameof(container)); if (changeFeedRequestOptions != null && changeFeedRequestOptions.MaxItemCount.HasValue @@ -47,7 +46,7 @@ internal ChangeFeedIteratorCore( throw new ArgumentOutOfRangeException(nameof(changeFeedRequestOptions.MaxItemCount)); } - this.clientContext = clientContext; + this.clientContext = container.ClientContext; this.container = container; this.changeFeedOptions = changeFeedRequestOptions ?? new ChangeFeedRequestOptions(); } @@ -69,7 +68,7 @@ public override public override Task ReadNextAsync(CancellationToken cancellationToken = default(CancellationToken)) { CosmosDiagnosticsContext diagnostics = CosmosDiagnosticsContext.Create(this.changeFeedOptions); - using (diagnostics.CreateScope("ChangeFeedReadNextAsync")) + using (diagnostics.CreateOverallScope("ChangeFeedReadNextAsync")) { return this.ReadNextInternalAsync(diagnostics, cancellationToken); } @@ -83,18 +82,21 @@ private async Task ReadNextInternalAsync( if (this.feedTokenInternal == null) { - Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.clientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(); - string containerRId = await this.container.GetRIDAsync(cancellationToken); - IReadOnlyList partitionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync( - containerRId, - new Documents.Routing.Range( - Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, - Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey, - isMinInclusive: true, - isMaxInclusive: false), - forceRefresh: true); - // ReadAll scenario, initialize with one token for all - this.feedTokenInternal = new FeedTokenEPKRange(containerRId, partitionKeyRanges); + TryCatch tryCatchFeedTokeninternal = await this.TryInitializeFeedTokenAsync(cancellationToken); + if (!tryCatchFeedTokeninternal.Succeeded) + { + if (tryCatchFeedTokeninternal.Exception.InnerException is CosmosException cosmosException) + { + return cosmosException.ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnosticsScope)); + } + + return CosmosExceptionFactory.CreateInternalServerErrorException( + message: tryCatchFeedTokeninternal.Exception.InnerException.Message, + innerException: tryCatchFeedTokeninternal.Exception.InnerException, + diagnosticsContext: diagnosticsScope).ToCosmosResponseMessage(new RequestMessage(method: null, requestUri: null, diagnosticsContext: diagnosticsScope)); + } + + this.feedTokenInternal = tryCatchFeedTokeninternal.Result; } Uri resourceUri = this.container.LinkUri; @@ -138,6 +140,31 @@ private async Task ReadNextInternalAsync( return responseMessage; } + private async Task> TryInitializeFeedTokenAsync(CancellationToken cancellationToken) + { + Routing.PartitionKeyRangeCache partitionKeyRangeCache = await this.clientContext.DocumentClient.GetPartitionKeyRangeCacheAsync(); + string containerRId = string.Empty; + try + { + containerRId = await this.container.GetRIDAsync(cancellationToken); + } + catch (Exception cosmosException) + { + return TryCatch.FromException(cosmosException); + } + + IReadOnlyList partitionKeyRanges = await partitionKeyRangeCache.TryGetOverlappingRangesAsync( + containerRId, + new Documents.Routing.Range( + Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, + Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey, + isMinInclusive: true, + isMaxInclusive: false), + forceRefresh: true); + // ReadAll scenario, initialize with one token for all + return TryCatch.FromResult(new FeedTokenEPKRange(containerRId, partitionKeyRanges)); + } + public override CosmosElement GetCosmsoElementContinuationToken() { throw new NotImplementedException(); diff --git a/Microsoft.Azure.Cosmos/src/Resource/Scripts/ScriptsCore.cs b/Microsoft.Azure.Cosmos/src/Resource/Scripts/ScriptsCore.cs index 0ae833a1d9..904ffb3092 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/Scripts/ScriptsCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/Scripts/ScriptsCore.cs @@ -75,13 +75,13 @@ public override FeedIterator GetStoredProcedureQueryStreamIterator( string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.clientContext, + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.clientContext, this.container.LinkUri, - ResourceType.StoredProcedure, - queryDefinition, - continuationToken, - requestOptions); + resourceType: ResourceType.StoredProcedure, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } public override FeedIterator GetStoredProcedureQueryIterator( @@ -287,13 +287,13 @@ public override FeedIterator GetTriggerQueryStreamIterator( string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.clientContext, + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.clientContext, this.container.LinkUri, - ResourceType.Trigger, - queryDefinition, - continuationToken, - requestOptions); + resourceType: ResourceType.Trigger, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } public override FeedIterator GetTriggerQueryIterator( @@ -447,13 +447,13 @@ public override FeedIterator GetUserDefinedFunctionQueryStreamIterator( string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.clientContext, + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.clientContext, this.container.LinkUri, - ResourceType.UserDefinedFunction, - queryDefinition, - continuationToken, - requestOptions); + resourceType: ResourceType.UserDefinedFunction, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } public override FeedIterator GetUserDefinedFunctionQueryIterator( diff --git a/Microsoft.Azure.Cosmos/src/Resource/User/UserCore.cs b/Microsoft.Azure.Cosmos/src/Resource/User/UserCore.cs index 5c587210d0..d88d3eaae9 100644 --- a/Microsoft.Azure.Cosmos/src/Resource/User/UserCore.cs +++ b/Microsoft.Azure.Cosmos/src/Resource/User/UserCore.cs @@ -228,13 +228,13 @@ public FeedIterator GetPermissionQueryStreamIterator(QueryDefinition queryDefini string continuationToken = null, QueryRequestOptions requestOptions = null) { - return new FeedIteratorCore( - this.ClientContext, + return FeedIteratorCore.CreateForNonPartitionedResource( + clientContext: this.ClientContext, this.LinkUri, - ResourceType.Permission, - queryDefinition, - continuationToken, - requestOptions); + resourceType: ResourceType.Permission, + queryDefinition: queryDefinition, + continuationToken: continuationToken, + options: requestOptions); } public override FeedIterator GetPermissionQueryIterator(string queryText = null, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs index 8acac09f3e..4d7b5bd30a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ChangeFeedIteratorCoreTests.cs @@ -104,6 +104,37 @@ await setIteratorNew.ReadNextAsync(this.cancellationToken)) Assert.AreEqual(expectedFinalCount, totalCount); } + /// + /// Test to verify that StarTime works as expected by inserting 50 items in two batches of 25 but capturing the time of just the second batch. + /// + [TestMethod] + public async Task ChangeFeedIteratorCore_StartTime() + { + int totalCount = 0; + int batchSize = 25; + await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); + await Task.Delay(1000); + DateTime now = DateTime.UtcNow; + await Task.Delay(1000); + await this.CreateRandomItems(this.Container, batchSize, randomPartitionKey: true); + ContainerCore itemsCore = this.Container; + FeedIterator feedIterator = itemsCore.GetChangeFeedStreamIterator(changeFeedRequestOptions: new ChangeFeedRequestOptions() { StartTime = now }); + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + } + } + + Assert.AreEqual(totalCount, batchSize); + } + /// /// Verify that we can read the Change Feed for a Partition Key and that does not read other items. /// diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ReadFeedTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ReadFeedTokenTests.cs new file mode 100644 index 0000000000..b34bcc36fc --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/FeedToken/ReadFeedTokenTests.cs @@ -0,0 +1,311 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Globalization; + using System.Linq; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Query; + using Microsoft.Azure.Cosmos.Query.Core; + using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Newtonsoft.Json; + + [TestClass] + public class ReadFeedTokenTests : BaseCosmosClientHelper + { + private ContainerCore Container = null; + private ContainerCore LargerContainer = null; + + [TestInitialize] + public async Task TestInitialize() + { + await base.TestInit(); + string PartitionKey = "/status"; + ContainerResponse response = await this.database.CreateContainerAsync( + new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey), + cancellationToken: this.cancellationToken); + Assert.IsNotNull(response); + Assert.IsNotNull(response.Container); + Assert.IsNotNull(response.Resource); + + ContainerResponse largerContainer = await this.database.CreateContainerAsync( + new ContainerProperties(id: Guid.NewGuid().ToString(), partitionKeyPath: PartitionKey), + throughput: 20000, + cancellationToken: this.cancellationToken); + + this.Container = (ContainerInlineCore)response; + this.LargerContainer = (ContainerInlineCore)largerContainer; + } + + [TestCleanup] + public async Task Cleanup() + { + await base.TestCleanup(); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_AllowsParallelProcessing() + { + int batchSize = 1000; + + await this.CreateRandomItems(this.LargerContainer, batchSize, randomPartitionKey: true); + ContainerCore itemsCore = this.LargerContainer; + IReadOnlyList tokens = await itemsCore.GetFeedTokensAsync(); + + List> tasks = tokens.Select(token => Task.Run(async () => + { + int count = 0; + FeedIteratorCore feedIterator = itemsCore.GetItemQueryStreamIterator(token) as FeedIteratorCore; + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + Assert.IsNotNull(feedIterator.FeedToken); + + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + count += response.Count; + } + } + } + + return count; + + })).ToList(); + + await Task.WhenAll(tasks); + + int documentsRead = 0; + foreach (Task task in tasks) + { + documentsRead += task.Result; + } + + Assert.AreEqual(batchSize, documentsRead); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_ReadAll() + { + int totalCount = 0; + int batchSize = 1000; + + await this.CreateRandomItems(this.LargerContainer, batchSize, randomPartitionKey: true); + ContainerCore itemsCore = this.LargerContainer; + FeedIteratorCore feedIterator = itemsCore.GetItemQueryStreamIterator(queryDefinition: null, requestOptions: new QueryRequestOptions() { MaxItemCount = 1 } ) as FeedIteratorCore; + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + Assert.IsNotNull(feedIterator.FeedToken); + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + } + } + + Assert.AreEqual(batchSize, totalCount); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_ReadOnlyPartitionKey() + { + int totalCount = 0; + int firstRunTotal = 25; + int batchSize = 25; + + string pkToRead = "pkToRead"; + string otherPK = "otherPK"; + + for (int i = 0; i < batchSize; i++) + { + await this.Container.CreateItemAsync(this.CreateRandomToDoActivity(pkToRead)); + } + + for (int i = 0; i < batchSize; i++) + { + await this.Container.CreateItemAsync(this.CreateRandomToDoActivity(otherPK)); + } + + ContainerCore itemsCore = this.Container; + FeedIterator feedIterator = itemsCore.GetItemQueryStreamIterator(requestOptions: new QueryRequestOptions() { PartitionKey = new PartitionKey(pkToRead) }); + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + foreach (ToDoActivity toDoActivity in response) + { + Assert.AreEqual(pkToRead, toDoActivity.status); + } + } + } + } + + Assert.AreEqual(firstRunTotal, totalCount); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_PassingFeedToken_ReadAll() + { + int totalCount = 0; + int batchSize = 1000; + + await this.CreateRandomItems(this.LargerContainer, batchSize, randomPartitionKey: true); + ContainerCore itemsCore = this.LargerContainer; + + FeedIteratorCore initialFeedIterator = itemsCore.GetItemQueryStreamIterator(queryDefinition: null, requestOptions: new QueryRequestOptions() { MaxItemCount = 1 }) as FeedIteratorCore; + while (initialFeedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await initialFeedIterator.ReadNextAsync(this.cancellationToken)) + { + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + break; + } + } + + // Use the previous iterators FeedToken to continue + FeedIteratorCore feedIterator = itemsCore.GetItemQueryStreamIterator(queryDefinition: null, feedToken: initialFeedIterator.FeedToken) as FeedIteratorCore; + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + Assert.IsNotNull(feedIterator.FeedToken); + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + } + } + + Assert.AreEqual(batchSize, totalCount); + } + + /// + /// Check to see how the older continuation token approach works when mixed with FeedToken + /// + /// + [TestMethod] + public async Task ReadFeedIteratorCore_ReadAll_MixContinuationToken() + { + int totalCount = 0; + int batchSize = 1000; + + await this.CreateRandomItems(this.LargerContainer, batchSize, randomPartitionKey: true); + ContainerCore itemsCore = this.LargerContainer; + + // Do a read without FeedToken and get the older CT from Header + string olderContinuationToken = null; + FeedIteratorCore feedIterator = itemsCore.GetItemQueryStreamIterator(queryDefinition: null, requestOptions: new QueryRequestOptions() { MaxItemCount = 1 }) as FeedIteratorCore; + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + olderContinuationToken = responseMessage.Headers.ContinuationToken; + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + break; + } + } + + // start a new iterator using the older CT and expect it to continue + feedIterator = itemsCore.GetItemQueryStreamIterator(queryDefinition: null, continuationToken: olderContinuationToken, requestOptions: new QueryRequestOptions() { MaxItemCount = 1 }) as FeedIteratorCore; + while (feedIterator.HasMoreResults) + { + using (ResponseMessage responseMessage = + await feedIterator.ReadNextAsync(this.cancellationToken)) + { + Assert.IsNotNull(feedIterator.FeedToken); + if (responseMessage.IsSuccessStatusCode) + { + Collection response = TestCommon.SerializerCore.FromStream>(responseMessage.Content).Data; + totalCount += response.Count; + } + } + } + + Assert.AreEqual(batchSize, totalCount); + } + + + private async Task> CreateRandomItems(ContainerCore container, int pkCount, int perPKItemCount = 1, bool randomPartitionKey = true) + { + Assert.IsFalse(!randomPartitionKey && perPKItemCount > 1); + + List createdList = new List(); + for (int i = 0; i < pkCount; i++) + { + string pk = "TBD"; + if (randomPartitionKey) + { + pk += Guid.NewGuid().ToString(); + } + + for (int j = 0; j < perPKItemCount; j++) + { + ToDoActivity temp = this.CreateRandomToDoActivity(pk); + + createdList.Add(temp); + + await container.CreateItemAsync(item: temp); + } + } + + return createdList; + } + + private ToDoActivity CreateRandomToDoActivity(string pk = null) + { + if (string.IsNullOrEmpty(pk)) + { + pk = "TBD" + Guid.NewGuid().ToString(); + } + + return new ToDoActivity() + { + id = Guid.NewGuid().ToString(), + description = "CreateRandomToDoActivity", + status = pk, + taskNum = 42, + cost = double.MaxValue + }; + } + + + public class ToDoActivity + { + public string id { get; set; } + public int taskNum { get; set; } + public double cost { get; set; } + public string description { get; set; } + public string status { get; set; } + } + } +} \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ChangeFeedIteratorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ChangeFeedIteratorCoreTests.cs index 3aae654eb5..6b6985a2a5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ChangeFeedIteratorCoreTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ChangeFeedIteratorCoreTests.cs @@ -5,35 +5,31 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; + using System.Collections.Generic; using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Handlers; + using Microsoft.Azure.Cosmos.Routing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; [TestClass] public class ChangeFeedIteratorCoreTests { - [TestMethod] - [ExpectedException(typeof(ArgumentNullException))] - public void ChangeFeedIteratorCore_Null_Context() - { - new ChangeFeedIteratorCore(null, Mock.Of(), new ChangeFeedRequestOptions()); - } - [TestMethod] [ExpectedException(typeof(ArgumentNullException))] public void ChangeFeedIteratorCore_Null_Container() { - new ChangeFeedIteratorCore(Mock.Of(), null, new ChangeFeedRequestOptions()); + new ChangeFeedIteratorCore(null, new ChangeFeedRequestOptions()); } [TestMethod] [ExpectedException(typeof(ArgumentNullException))] public void ChangeFeedIteratorCore_Null_Token() { - new ChangeFeedIteratorCore(Mock.Of(), Mock.Of(), null, new ChangeFeedRequestOptions()); + new ChangeFeedIteratorCore(Mock.Of(), null, new ChangeFeedRequestOptions()); } [DataTestMethod] @@ -42,13 +38,13 @@ public void ChangeFeedIteratorCore_Null_Token() [DataRow(0)] public void ChangeFeedIteratorCore_ValidateOptions(int maxItemCount) { - new ChangeFeedIteratorCore(Mock.Of(), Mock.Of(), new ChangeFeedRequestOptions() { MaxItemCount = maxItemCount }); + new ChangeFeedIteratorCore(Mock.Of(), new ChangeFeedRequestOptions() { MaxItemCount = maxItemCount }); } [TestMethod] public void ChangeFeedIteratorCore_HasMoreResultsDefault() { - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(Mock.Of(), Mock.Of(), null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(Mock.Of(), null); Assert.IsTrue(changeFeedIteratorCore.HasMoreResults); } @@ -56,7 +52,7 @@ public void ChangeFeedIteratorCore_HasMoreResultsDefault() public void ChangeFeedIteratorCore_FeedToken() { FeedTokenInternal feedToken = Mock.Of(); - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(Mock.Of(), Mock.Of(), feedToken, null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(Mock.Of(), feedToken, null); Assert.AreEqual(feedToken, changeFeedIteratorCore.FeedToken); } @@ -66,6 +62,7 @@ public async Task ChangeFeedIteratorCore_ReadNextAsync() string continuation = "TBD"; ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); responseMessage.Headers.ETag = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; Mock cosmosClientContext = new Mock(); cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); @@ -84,6 +81,9 @@ public async Task ChangeFeedIteratorCore_ReadNextAsync() .Returns(Task.FromResult(responseMessage)); ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); FeedTokenInternal feedToken = Mock.Of(); Mock.Get(feedToken) .Setup(f => f.EnrichRequest(It.IsAny())); @@ -91,7 +91,7 @@ public async Task ChangeFeedIteratorCore_ReadNextAsync() .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(false)); - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(cosmosClientContext.Object, containerCore, feedToken, null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerCore, feedToken, null); ResponseMessage response = await changeFeedIteratorCore.ReadNextAsync(); Assert.AreEqual(feedToken, changeFeedIteratorCore.FeedToken); @@ -108,6 +108,7 @@ public async Task ChangeFeedIteratorCore_OfT_ReadNextAsync() string continuation = "TBD"; ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); responseMessage.Headers.ETag = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; Mock cosmosClientContext = new Mock(); cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); @@ -126,6 +127,9 @@ public async Task ChangeFeedIteratorCore_OfT_ReadNextAsync() .Returns(Task.FromResult(responseMessage)); ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); FeedTokenInternal feedToken = Mock.Of(); Mock.Get(feedToken) .Setup(f => f.EnrichRequest(It.IsAny())); @@ -133,7 +137,7 @@ public async Task ChangeFeedIteratorCore_OfT_ReadNextAsync() .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(false)); - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(cosmosClientContext.Object, containerCore, feedToken, null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerCore, feedToken, null); bool creatorCalled = false; Func> creator = (ResponseMessage r) => @@ -178,6 +182,9 @@ public async Task ChangeFeedIteratorCore_UpdatesContinuation_On304() .Returns(Task.FromResult(responseMessage)); ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); FeedTokenInternal feedToken = Mock.Of(); Mock.Get(feedToken) .Setup(f => f.EnrichRequest(It.IsAny())); @@ -185,7 +192,7 @@ public async Task ChangeFeedIteratorCore_UpdatesContinuation_On304() .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(false)); - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(cosmosClientContext.Object, containerCore, feedToken, null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerCore, feedToken, null); ResponseMessage response = await changeFeedIteratorCore.ReadNextAsync(); Mock.Get(feedToken) @@ -219,6 +226,9 @@ public async Task ChangeFeedIteratorCore_DoesNotUpdateContinuation_OnError() .Returns(Task.FromResult(responseMessage)); ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); FeedTokenInternal feedToken = Mock.Of(); Mock.Get(feedToken) .Setup(f => f.EnrichRequest(It.IsAny())); @@ -226,7 +236,7 @@ public async Task ChangeFeedIteratorCore_DoesNotUpdateContinuation_OnError() .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) .Returns(Task.FromResult(false)); - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(cosmosClientContext.Object, containerCore, feedToken, null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerCore, feedToken, null); ResponseMessage response = await changeFeedIteratorCore.ReadNextAsync(); Assert.IsFalse(changeFeedIteratorCore.HasMoreResults); @@ -244,6 +254,7 @@ public async Task ChangeFeedIteratorCore_Retries() string continuation = "TBD"; ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); responseMessage.Headers.ETag = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; Mock cosmosClientContext = new Mock(); cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); @@ -262,6 +273,9 @@ public async Task ChangeFeedIteratorCore_Retries() .Returns(Task.FromResult(responseMessage)); ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); FeedTokenInternal feedToken = Mock.Of(); Mock.Get(feedToken) .Setup(f => f.EnrichRequest(It.IsAny())); @@ -270,7 +284,7 @@ public async Task ChangeFeedIteratorCore_Retries() .Returns(Task.FromResult(true)) .Returns(Task.FromResult(false)); - ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(cosmosClientContext.Object, containerCore, feedToken, null); + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerCore, feedToken, null); ResponseMessage response = await changeFeedIteratorCore.ReadNextAsync(); Mock.Get(feedToken) @@ -292,5 +306,92 @@ public async Task ChangeFeedIteratorCore_Retries() It.IsAny(), It.IsAny()), Times.Exactly(2)); } + + [TestMethod] + public async Task ChangeFeedIteratorCore_HandlesSplitsThroughPipeline() + { + int executionCount = 0; + CosmosClientContext cosmosClientContext = GetMockedClientContext((RequestMessage requestMessage, CancellationToken cancellationToken) => + { + // Force OnBeforeRequestActions call + requestMessage.ToDocumentServiceRequest(); + if (executionCount++ == 0) + { + return TestHandler.ReturnStatusCode(HttpStatusCode.Gone, Documents.SubStatusCodes.PartitionKeyRangeGone); + } + + return TestHandler.ReturnStatusCode(HttpStatusCode.OK); + }); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext); + Mock.Get(containerCore) + .Setup(c => c.LinkUri) + .Returns(new Uri("https://dummy.documents.azure.com:443/dbs")); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.EnrichRequest(It.IsAny())); + Mock.Get(feedToken) + .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(false)); + + ChangeFeedIteratorCore changeFeedIteratorCore = new ChangeFeedIteratorCore(containerCore, feedToken, null); + + ResponseMessage response = await changeFeedIteratorCore.ReadNextAsync(); + + Assert.AreEqual(1, executionCount, "PartitionKeyRangeGoneRetryHandler handled the Split"); + Assert.AreEqual(HttpStatusCode.Gone, response.StatusCode); + + Mock.Get(feedToken) + .Verify(f => f.UpdateContinuation(It.IsAny()), Times.Never); + + Mock.Get(feedToken) + .Verify(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny()), Times.Once); + } + + private static CosmosClientContext GetMockedClientContext( + Func> handlerFunc) + { + CosmosClient client = MockCosmosUtil.CreateMockCosmosClient(); + + Mock partitionRoutingHelperMock = MockCosmosUtil.GetPartitionRoutingHelperMock("0"); + + TestHandler testHandler = new TestHandler(handlerFunc); + + // Similar to FeedPipeline but with replaced transport + RequestHandler[] feedPipeline = new RequestHandler[] + { + new NamedCacheRetryHandler(), + new PartitionKeyRangeHandler(client), + testHandler, + }; + + RequestHandler feedHandler = ClientPipelineBuilder.CreatePipeline(feedPipeline); + + RequestHandler handler = client.RequestHandler.InnerHandler; + while (handler != null) + { + if (handler.InnerHandler is RouterHandler) + { + handler.InnerHandler = new RouterHandler(feedHandler, testHandler); + break; + } + + handler = handler.InnerHandler; + } + + CosmosResponseFactory responseFactory = new CosmosResponseFactory(MockCosmosUtil.Serializer); + + return new ClientContextCore( + client: client, + clientOptions: new CosmosClientOptions(), + serializerCore: MockCosmosUtil.Serializer, + cosmosResponseFactory: responseFactory, + requestHandler: client.RequestHandler, + documentClient: new MockDocumentClient(), + userAgent: null); + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/FeedTokenTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/FeedTokenTests.cs index 9673e57c86..3e595dec30 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/FeedTokenTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/FeedTokenTests.cs @@ -6,13 +6,10 @@ namespace Microsoft.Azure.Cosmos.Tests { using System; using System.Collections.Generic; - using System.Diagnostics; - using System.IO; using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; - using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Query.Core.ContinuationTokens; using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; @@ -93,6 +90,20 @@ public void FeedToken_EPK_EnrichRequest() Assert.ThrowsException(() => token.EnrichRequest(null)); } + [TestMethod] + public void FeedToken_EPK_NotEnrichRequest_IfEPKAlreadyExists() + { + const string containerRid = "containerRid"; + string epkString = Guid.NewGuid().ToString(); + FeedTokenEPKRange token = new FeedTokenEPKRange(containerRid, new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive = "B" }); + RequestMessage requestMessage = new RequestMessage(); + requestMessage.Properties[HandlerConstants.StartEpkString] = epkString; + requestMessage.Properties[HandlerConstants.EndEpkString] = epkString; + token.EnrichRequest(requestMessage); + Assert.AreEqual(epkString, requestMessage.Properties[HandlerConstants.StartEpkString]); + Assert.AreEqual(epkString, requestMessage.Properties[HandlerConstants.EndEpkString]); + } + [TestMethod] public void FeedToken_PartitionKey_TryParse() { @@ -172,8 +183,9 @@ public async Task FeedToken_EPK_ShouldRetry() FeedTokenEPKRange feedTokenEPKRange = new FeedTokenEPKRange(Guid.NewGuid().ToString(), new Documents.Routing.Range(compositeContinuationTokens[0].Range.Min, compositeContinuationTokens[1].Range.Min, true, false), compositeContinuationTokens); ContainerCore containerCore = Mock.Of(); - - Assert.IsFalse(await feedTokenEPKRange.ShouldRetryAsync(containerCore, new ResponseMessage(HttpStatusCode.OK))); + ResponseMessage okResponse = new ResponseMessage(HttpStatusCode.OK); + okResponse.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + Assert.IsFalse(await feedTokenEPKRange.ShouldRetryAsync(containerCore, okResponse)); // A 304 on a multi Range token should cycle on all available ranges before stopping retrying Assert.IsTrue(await feedTokenEPKRange.ShouldRetryAsync(containerCore, new ResponseMessage(HttpStatusCode.NotModified))); @@ -277,6 +289,75 @@ public async Task FeedToken_PartitionKeyRange_HandleSplits() Assert.AreEqual(documentClient.AvailablePartitionKeyRanges[1].MaxExclusive, continuationTokens[1].Range.Max); } + [TestMethod] + public void FeedToken_PartitionKey_IsDone() + { + PartitionKey pk = new PartitionKey("test"); + FeedTokenPartitionKey token = new FeedTokenPartitionKey(pk); + token.UpdateContinuation(Guid.NewGuid().ToString()); + Assert.IsFalse(token.IsDone); + token.UpdateContinuation(null); + Assert.IsTrue(token.IsDone); + } + + [TestMethod] + public void FeedToken_PartitionKeyRange_IsDone() + { + string pkrangeId = "0"; + FeedTokenPartitionKeyRange token = new FeedTokenPartitionKeyRange(pkrangeId); + token.UpdateContinuation(Guid.NewGuid().ToString()); + Assert.IsFalse(token.IsDone); + token.UpdateContinuation(null); + Assert.IsTrue(token.IsDone); + } + + [TestMethod] + public void FeedToken_EPK_IsDone() + { + const string containerRid = "containerRid"; + FeedTokenEPKRange token = new FeedTokenEPKRange(containerRid, + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive = "B" }); + + token.UpdateContinuation(Guid.NewGuid().ToString()); + Assert.IsFalse(token.IsDone); + + token.UpdateContinuation(null); + Assert.IsTrue(token.IsDone); + } + + [TestMethod] + public void FeedToken_EPK_IsDone_MultipleRanges() + { + const string containerRid = "containerRid"; + FeedTokenEPKRange token = new FeedTokenEPKRange(containerRid, + new List() { + new Documents.PartitionKeyRange() { MinInclusive = "A", MaxExclusive = "B" }, + new Documents.PartitionKeyRange() { MinInclusive = "B", MaxExclusive = "C" }, + new Documents.PartitionKeyRange() { MinInclusive = "C", MaxExclusive = "D" } + }); + + // First range has continuation + token.UpdateContinuation(Guid.NewGuid().ToString()); + Assert.IsFalse(token.IsDone); + + // Second range is done + token.UpdateContinuation(null); + Assert.IsFalse(token.IsDone); + + // Third range is done + token.UpdateContinuation(null); + Assert.IsFalse(token.IsDone); + + // First range has continuation + token.UpdateContinuation(Guid.NewGuid().ToString()); + Assert.IsFalse(token.IsDone); + + // MoveNext should skip the second and third + // Finish first one + token.UpdateContinuation(null); + Assert.IsTrue(token.IsDone); + } + private static CompositeContinuationToken BuildTokenForRange( string min, string max, diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ReadFeedTokenIteratorCoreTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ReadFeedTokenIteratorCoreTests.cs new file mode 100644 index 0000000000..dbecdec2f6 --- /dev/null +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/FeedToken/ReadFeedTokenIteratorCoreTests.cs @@ -0,0 +1,434 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Tests +{ + using System; + using System.IO; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Azure.Cosmos.Handlers; + using Microsoft.Azure.Cosmos.Routing; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class ReadFeedIteratorCoreTests + { + [TestMethod] + public void ReadFeedIteratorCore_HasMoreResultsDefault() + { + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(Mock.Of(), new Uri("http://localhost"), Documents.ResourceType.Document, null, null, null, new QueryRequestOptions()); + Assert.IsTrue(feedTokenIterator.HasMoreResults); + } + + [TestMethod] + public void ReadFeedIteratorCore_FeedToken() + { + FeedTokenInternal feedToken = Mock.Of(); + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(Mock.Of(), new Uri("http://localhost"), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + Assert.AreEqual(feedToken, feedTokenIterator.FeedToken); + } + + [TestMethod] + public void ReadFeedIteratorCore_TryGetContinuation() + { + string continuation = Guid.NewGuid().ToString(); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.GetContinuation()).Returns(continuation); + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(Mock.Of(), new Uri("http://localhost"), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + Assert.AreEqual(continuation, feedTokenIterator.ContinuationToken); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_ReadNextAsync() + { + string continuation = "TBD"; + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); + responseMessage.Headers.ContinuationToken = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext + .Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(responseMessage)); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.EnrichRequest(It.IsAny())); + Mock.Get(feedToken) + .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(false)); + Mock.Get(feedToken) + .Setup(f => f.GetContinuation()) + .Returns(continuation); + Mock.Get(feedToken) + .Setup(f => f.IsDone) + .Returns(true); + + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(containerCore, new Uri("http://localhost"), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + ResponseMessage response = await feedTokenIterator.ReadNextAsync(); + + Assert.AreEqual(feedToken, feedTokenIterator.FeedToken); + Mock.Get(feedToken) + .Verify(f => f.UpdateContinuation(It.Is(ct => ct == continuation)), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny()), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.IsDone, Times.Once); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_OfT_ReadNextAsync() + { + string continuation = "TBD"; + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); + responseMessage.Headers.ContinuationToken = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext + .Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(responseMessage)); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.EnrichRequest(It.IsAny())); + Mock.Get(feedToken) + .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(false)); + Mock.Get(feedToken) + .Setup(f => f.GetContinuation()) + .Returns(continuation); + Mock.Get(feedToken) + .Setup(f => f.IsDone) + .Returns(true); + + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(containerCore, new Uri("http://localhost"), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + bool creatorCalled = false; + Func> creator = (ResponseMessage r) => + { + creatorCalled = true; + return Mock.Of>(); + }; + + FeedIteratorCore feedTokenIteratorOfT = new FeedIteratorCore(feedTokenIterator, creator); + FeedResponse response = await feedTokenIteratorOfT.ReadNextAsync(); + + Assert.IsTrue(creatorCalled, "Response creator not called"); + Mock.Get(feedToken) + .Verify(f => f.UpdateContinuation(It.Is(ct => ct == continuation)), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny()), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.IsDone, Times.Once); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_UpdatesContinuation_OnOK() + { + string continuation = "TBD"; + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); + responseMessage.Headers.ContinuationToken = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext + .Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(responseMessage)); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.EnrichRequest(It.IsAny())); + Mock.Get(feedToken) + .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(false)); + Mock.Get(feedToken) + .Setup(f => f.GetContinuation()) + .Returns(continuation); + Mock.Get(feedToken) + .Setup(f => f.IsDone) + .Returns(true); + + FeedIterator feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(containerCore, new Uri("http://localhost"), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + ResponseMessage response = await feedTokenIterator.ReadNextAsync(); + + Mock.Get(feedToken) + .Verify(f => f.UpdateContinuation(It.Is(ct => ct == continuation)), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny()), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.IsDone, Times.Once); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_DoesNotUpdateContinuation_OnError() + { + string continuation = "TBD"; + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.Gone); + responseMessage.Headers.ContinuationToken = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext + .Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(responseMessage)); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.EnrichRequest(It.IsAny())); + Mock.Get(feedToken) + .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(false)); + Mock.Get(feedToken) + .Setup(f => f.GetContinuation()) + .Returns(continuation); + Mock.Get(feedToken) + .Setup(f => f.IsDone) + .Returns(true); + + FeedIterator feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(containerCore, new Uri("http://localhost"), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + ResponseMessage response = await feedTokenIterator.ReadNextAsync(); + + Mock.Get(feedToken) + .Verify(f => f.UpdateContinuation(It.Is(ct => ct == continuation)), Times.Never); + + Mock.Get(feedToken) + .Verify(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny()), Times.Once); + + Mock.Get(feedToken) + .Verify(f => f.IsDone, Times.Never); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_WithNoInitialState_ReadNextAsync() + { + string continuation = "TBD"; + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); + responseMessage.Headers.ContinuationToken = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext + .Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(responseMessage)); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext.Object); + + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForPartitionedResource(containerCore, new Uri("http://localhost"), Documents.ResourceType.Document, null, null, null, new QueryRequestOptions()); + ResponseMessage response = await feedTokenIterator.ReadNextAsync(); + + FeedToken feedTokenOut = feedTokenIterator.FeedToken; + Assert.IsNotNull(feedTokenOut); + + FeedTokenEPKRange feedTokenEPKRange = feedTokenOut as FeedTokenEPKRange; + // Assert that a FeedToken for the entire range is used + Assert.AreEqual(Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, feedTokenEPKRange.CompleteRange.Min); + Assert.AreEqual(Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey, feedTokenEPKRange.CompleteRange.Max); + Assert.AreEqual(continuation, feedTokenEPKRange.CompositeContinuationTokens.Peek().Token); + Assert.IsFalse(feedTokenEPKRange.IsDone); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_ForNonPartitionedResource_WithNoInitialState_ReadNextAsync() + { + string continuation = "TBD"; + ResponseMessage responseMessage = new ResponseMessage(HttpStatusCode.OK); + responseMessage.Headers.ContinuationToken = continuation; + responseMessage.Headers[Documents.HttpConstants.HttpHeaders.ItemCount] = "1"; + + Mock cosmosClientContext = new Mock(); + cosmosClientContext.Setup(c => c.ClientOptions).Returns(new CosmosClientOptions()); + cosmosClientContext + .Setup(c => c.ProcessResourceOperationStreamAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(Task.FromResult(responseMessage)); + + FeedIteratorCore feedTokenIterator = FeedIteratorCore.CreateForNonPartitionedResource(cosmosClientContext.Object, new Uri("http://localhost"), Documents.ResourceType.Document, null, null, new QueryRequestOptions()); + ResponseMessage response = await feedTokenIterator.ReadNextAsync(); + + FeedToken feedTokenOut = feedTokenIterator.FeedToken; + Assert.IsNotNull(feedTokenOut); + + FeedTokenEPKRange feedTokenEPKRange = feedTokenOut as FeedTokenEPKRange; + // Assert that a FeedToken for the entire range is used + Assert.AreEqual(Documents.Routing.PartitionKeyInternal.MinimumInclusiveEffectivePartitionKey, feedTokenEPKRange.CompleteRange.Min); + Assert.AreEqual(Documents.Routing.PartitionKeyInternal.MaximumExclusiveEffectivePartitionKey, feedTokenEPKRange.CompleteRange.Max); + Assert.AreEqual(continuation, feedTokenEPKRange.CompositeContinuationTokens.Peek().Token); + Assert.IsFalse(feedTokenEPKRange.IsDone); + } + + [TestMethod] + public async Task ReadFeedIteratorCore_HandlesSplitsThroughPipeline() + { + int executionCount = 0; + CosmosClientContext cosmosClientContext = GetMockedClientContext((RequestMessage requestMessage, CancellationToken cancellationToken) => + { + // Force OnBeforeRequestActions call + requestMessage.ToDocumentServiceRequest(); + if (executionCount++ == 0) + { + return TestHandler.ReturnStatusCode(HttpStatusCode.Gone, Documents.SubStatusCodes.PartitionKeyRangeGone); + } + + return TestHandler.ReturnStatusCode(HttpStatusCode.OK); + }); + + ContainerCore containerCore = Mock.Of(); + Mock.Get(containerCore) + .Setup(c => c.ClientContext) + .Returns(cosmosClientContext); + Mock.Get(containerCore) + .Setup(c => c.LinkUri) + .Returns(new Uri($"/dbs/db/colls/colls", UriKind.Relative)); + FeedTokenInternal feedToken = Mock.Of(); + Mock.Get(feedToken) + .Setup(f => f.EnrichRequest(It.IsAny())); + Mock.Get(feedToken) + .Setup(f => f.ShouldRetryAsync(It.Is(c => c == containerCore), It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(false)); + + FeedIteratorCore changeFeedIteratorCore = FeedIteratorCore.CreateForPartitionedResource(containerCore, new Uri($"/dbs/db/colls/colls", UriKind.Relative), Documents.ResourceType.Document, null, null, feedToken, new QueryRequestOptions()); + + ResponseMessage response = await changeFeedIteratorCore.ReadNextAsync(); + + Assert.AreEqual(1, executionCount, "Pipeline handled the Split"); + Assert.AreEqual(HttpStatusCode.Gone, response.StatusCode); + } + + private static CosmosClientContext GetMockedClientContext( + Func> handlerFunc) + { + CosmosClient client = MockCosmosUtil.CreateMockCosmosClient(); + + Mock partitionRoutingHelperMock = MockCosmosUtil.GetPartitionRoutingHelperMock("0"); + + TestHandler testHandler = new TestHandler(handlerFunc); + + // Similar to FeedPipeline but with replaced transport + RequestHandler[] feedPipeline = new RequestHandler[] + { + new NamedCacheRetryHandler(), + new PartitionKeyRangeHandler(client), + testHandler, + }; + + RequestHandler feedHandler = ClientPipelineBuilder.CreatePipeline(feedPipeline); + + RequestHandler handler = client.RequestHandler.InnerHandler; + while (handler != null) + { + if (handler.InnerHandler is RouterHandler) + { + handler.InnerHandler = new RouterHandler(feedHandler, testHandler); + break; + } + + handler = handler.InnerHandler; + } + + CosmosResponseFactory responseFactory = new CosmosResponseFactory(MockCosmosUtil.Serializer); + + return new ClientContextCore( + client: client, + clientOptions: new CosmosClientOptions(), + serializerCore: MockCosmosUtil.Serializer, + cosmosResponseFactory: responseFactory, + requestHandler: client.RequestHandler, + documentClient: new MockDocumentClient(), + userAgent: null); + } + } +} diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ResponseMessageTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ResponseMessageTests.cs index 118ec12a14..31dd76dd8c 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ResponseMessageTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ResponseMessageTests.cs @@ -4,6 +4,7 @@ namespace Microsoft.Azure.Cosmos.Tests { + using System; using Microsoft.Azure.Documents; using Microsoft.VisualStudio.TestTools.UnitTesting; @@ -51,5 +52,38 @@ public void IsFeedOperation_ForOtherOperations() request2.ResourceType = ResourceType.Database; Assert.IsFalse(request2.IsPartitionKeyRangeHandlerRequired); } + + [TestMethod] + public void IsFeedOperation_ForFeedTokenEPKRange() + { + RequestMessage request = new RequestMessage(); + request.OperationType = OperationType.ReadFeed; + request.ResourceType = ResourceType.Document; + FeedTokenInternal feedTokenEPKRange = new FeedTokenEPKRange(Guid.NewGuid().ToString(), new PartitionKeyRange() { MinInclusive = "AA", MaxExclusive = "BB", Id = "0" }); + feedTokenEPKRange.EnrichRequest(request); + Assert.IsTrue(request.IsPartitionKeyRangeHandlerRequired); + } + + [TestMethod] + public void IsFeedOperation_ForFeedTokenPartitionKeyRange() + { + RequestMessage request = new RequestMessage(); + request.OperationType = OperationType.ReadFeed; + request.ResourceType = ResourceType.Document; + FeedTokenInternal feedTokenEPKRange = new FeedTokenPartitionKeyRange("0"); + feedTokenEPKRange.EnrichRequest(request); + Assert.IsFalse(request.IsPartitionKeyRangeHandlerRequired); + } + + [TestMethod] + public void IsFeedOperation_ForFeedTokenPartitionKey() + { + RequestMessage request = new RequestMessage(); + request.OperationType = OperationType.ReadFeed; + request.ResourceType = ResourceType.Document; + FeedTokenInternal feedTokenEPKRange = new FeedTokenPartitionKey(new Cosmos.PartitionKey("0")); + feedTokenEPKRange.EnrichRequest(request); + Assert.IsFalse(request.IsPartitionKeyRangeHandlerRequired); + } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RetryHandlerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RetryHandlerTests.cs index 29caef2520..03fcc9e8b5 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RetryHandlerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/RetryHandlerTests.cs @@ -250,55 +250,5 @@ public async Task InvalidPartitionExceptionRetryHandlerDoesNotRetryOn410() await invoker.SendAsync(requestMessage, new CancellationToken()); Assert.AreEqual(expectedHandlerCalls, handlerCalls); } - - [TestMethod] - public async Task PartitionKeyRangeGoneRetryHandlerOnSuccess() - { - CosmosClient client = MockCosmosUtil.CreateMockCosmosClient(); - - PartitionKeyRangeGoneRetryHandler retryHandler = new PartitionKeyRangeGoneRetryHandler(client); - int handlerCalls = 0; - int expectedHandlerCalls = 1; - TestHandler testHandler = new TestHandler((request, cancellationToken) => { - handlerCalls++; - return TestHandler.ReturnSuccess(); - }); - - retryHandler.InnerHandler = testHandler; - RequestInvokerHandler invoker = new RequestInvokerHandler(client); - invoker.InnerHandler = retryHandler; - RequestMessage requestMessage = new RequestMessage(HttpMethod.Get, new Uri("https://dummy.documents.azure.com:443/dbs")); - await invoker.SendAsync(requestMessage, new CancellationToken()); - Assert.AreEqual(expectedHandlerCalls, handlerCalls); - } - - [TestMethod] - public async Task PartitionKeyRangeGoneRetryHandlerOn410() - { - CosmosClient client = MockCosmosUtil.CreateMockCosmosClient(); - - int handlerCalls = 0; - TestHandler testHandler = new TestHandler((request, response) => { - if (handlerCalls == 0) - { - handlerCalls++; - return TestHandler.ReturnStatusCode((HttpStatusCode)StatusCodes.Gone, SubStatusCodes.PartitionKeyRangeGone); - } - - handlerCalls++; - return TestHandler.ReturnSuccess(); - }); - - PartitionKeyRangeGoneRetryHandler retryHandler = new PartitionKeyRangeGoneRetryHandler(client); - retryHandler.InnerHandler = testHandler; - - RequestInvokerHandler invoker = new RequestInvokerHandler(client); - invoker.InnerHandler = retryHandler; - RequestMessage requestMessage = new RequestMessage(HttpMethod.Get, new Uri("https://localhost/dbs/db1/colls/col1/docs/doc1")); - await invoker.SendAsync(requestMessage, new CancellationToken()); - - int expectedHandlerCalls = 2; - Assert.AreEqual(expectedHandlerCalls, handlerCalls); - } } }