diff --git a/Microsoft.Azure.Cosmos.Encryption/tests/EmulatorTests/MdeEncryptionTests.cs b/Microsoft.Azure.Cosmos.Encryption/tests/EmulatorTests/MdeEncryptionTests.cs index 0073367c02..82824383e3 100644 --- a/Microsoft.Azure.Cosmos.Encryption/tests/EmulatorTests/MdeEncryptionTests.cs +++ b/Microsoft.Azure.Cosmos.Encryption/tests/EmulatorTests/MdeEncryptionTests.cs @@ -362,36 +362,6 @@ public async Task EncryptionResourceTokenAuthRestricted() } } - [TestMethod] - public async Task EncryptionFailsWithUnknownClientEncryptionKey() - { - ClientEncryptionIncludedPath unknownKeyConfigured = new ClientEncryptionIncludedPath() - { - Path = "/", - ClientEncryptionKeyId = "unknownKey", - EncryptionType = "Deterministic", - EncryptionAlgorithm = "AEAD_AES_256_CBC_HMAC_SHA256", - }; - - Collection paths = new Collection { unknownKeyConfigured }; - ClientEncryptionPolicy clientEncryptionPolicyId = new ClientEncryptionPolicy(paths); - - ContainerProperties containerProperties = new ContainerProperties(Guid.NewGuid().ToString(), "/PK") { ClientEncryptionPolicy = clientEncryptionPolicyId }; - - Container encryptionContainer = await database.CreateContainerAsync(containerProperties, 400); - - try - { - await encryptionContainer.InitializeEncryptionAsync(); - await MdeEncryptionTests.MdeCreateItemAsync(encryptionContainer); - Assert.Fail("Expected item creation should fail since client encryption policy is configured with unknown key."); - } - catch (Exception ex) - { - Assert.IsTrue(ex is InvalidOperationException); - } - } - [TestMethod] public async Task ClientEncryptionPolicyTests() { diff --git a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs index 3a137f4dcf..ad690baca4 100644 --- a/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs +++ b/Microsoft.Azure.Cosmos/src/Pagination/CrossPartitionRangePageAsyncEnumerator.cs @@ -147,12 +147,8 @@ public async ValueTask MoveNextAsync(ITrace trace) currentPaginator.FeedRangeState.FeedRange, childTrace, this.cancellationToken); - if (childRanges.Count == 0) - { - throw new InvalidOperationException("Got back no children"); - } - if (childRanges.Count == 1) + if (childRanges.Count <= 1) { // We optimistically assumed that the cache is not stale. // In the event that it is (where we only get back one child / the partition that we think got split) @@ -164,16 +160,32 @@ public async ValueTask MoveNextAsync(ITrace trace) this.cancellationToken); } - if (childRanges.Count() <= 1) + if (childRanges.Count < 1) { - throw new InvalidOperationException("Expected more than 1 child"); + string errorMessage = "SDK invariant violated 4795CC37: Must have at least one EPK range in a cross partition enumerator"; + throw Resource.CosmosExceptions.CosmosExceptionFactory.CreateInternalServerErrorException( + message: errorMessage, + headers: null, + stackTrace: null, + trace: childTrace, + error: new Microsoft.Azure.Documents.Error { Code = "SDK_invariant_violated_4795CC37", Message = errorMessage }); } - foreach (FeedRangeInternal childRange in childRanges) + if (childRanges.Count == 1) + { + // On a merge, the 410/1002 results in a single parent + // We maintain the current enumerator's range and let the RequestInvokerHandler logic kick in + enumerators.Enqueue(currentPaginator); + } + else { - PartitionRangePageAsyncEnumerator childPaginator = this.createPartitionRangeEnumerator( - new FeedRangeState(childRange, currentPaginator.FeedRangeState.State)); - enumerators.Enqueue(childPaginator); + // Split + foreach (FeedRangeInternal childRange in childRanges) + { + PartitionRangePageAsyncEnumerator childPaginator = this.createPartitionRangeEnumerator( + new FeedRangeState(childRange, currentPaginator.FeedRangeState.State)); + enumerators.Enqueue(childPaginator); + } } // Recursively retry diff --git a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs index 7d9973b14b..db71ec8c58 100644 --- a/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs +++ b/Microsoft.Azure.Cosmos/src/Query/Core/Pipeline/CrossPartition/OrderBy/OrderByCrossPartitionQueryPipelineStage.cs @@ -294,12 +294,8 @@ private async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( uninitializedEnumerator.FeedRangeState.FeedRange, trace, this.cancellationToken); - if (childRanges.Count == 0) - { - throw new InvalidOperationException("Got back no children"); - } - if (childRanges.Count == 1) + if (childRanges.Count <= 1) { // We optimistically assumed that the cache is not stale. // In the event that it is (where we only get back one child / the partition that we think got split) @@ -311,25 +307,49 @@ private async ValueTask MoveNextAsync_InitializeAsync_HandleSplitAsync( this.cancellationToken); } - if (childRanges.Count() <= 1) + if (childRanges.Count < 1) { - throw new InvalidOperationException("Expected more than 1 child"); + string errorMessage = "SDK invariant violated 82086B2D: Must have at least one EPK range in a cross partition enumerator"; + throw Resource.CosmosExceptions.CosmosExceptionFactory.CreateInternalServerErrorException( + message: errorMessage, + headers: null, + stackTrace: null, + trace: trace, + error: new Microsoft.Azure.Documents.Error { Code = "SDK_invariant_violated_82086B2D", Message = errorMessage }); } - foreach (FeedRangeInternal childRange in childRanges) + if (childRanges.Count == 1) { - this.cancellationToken.ThrowIfCancellationRequested(); - + // On a merge, the 410/1002 results in a single parent + // We maintain the current enumerator's range and let the RequestInvokerHandler logic kick in OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = new OrderByQueryPartitionRangePageAsyncEnumerator( this.documentContainer, uninitializedEnumerator.SqlQuerySpec, - new FeedRangeState(childRange, uninitializedEnumerator.StartOfPageState), + new FeedRangeState(uninitializedEnumerator.FeedRangeState.FeedRange, uninitializedEnumerator.StartOfPageState), partitionKey: null, uninitializedEnumerator.QueryPaginationOptions, uninitializedEnumerator.Filter, this.cancellationToken); this.uninitializedEnumeratorsAndTokens.Enqueue((childPaginator, token)); } + else + { + // Split + foreach (FeedRangeInternal childRange in childRanges) + { + this.cancellationToken.ThrowIfCancellationRequested(); + + OrderByQueryPartitionRangePageAsyncEnumerator childPaginator = new OrderByQueryPartitionRangePageAsyncEnumerator( + this.documentContainer, + uninitializedEnumerator.SqlQuerySpec, + new FeedRangeState(childRange, uninitializedEnumerator.StartOfPageState), + partitionKey: null, + uninitializedEnumerator.QueryPaginationOptions, + uninitializedEnumerator.Filter, + this.cancellationToken); + this.uninitializedEnumeratorsAndTokens.Enqueue((childPaginator, token)); + } + } // Recursively retry return await this.MoveNextAsync(trace); diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosContainerTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosContainerTests.cs index e03e974507..4537ec9f6b 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosContainerTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/CosmosContainerTests.cs @@ -1320,9 +1320,41 @@ public async Task TimeToLivePropertyPath() Assert.AreEqual(HttpStatusCode.NoContent, containerResponse.StatusCode); } + [TestMethod] + public async Task ContainerCreationFailsWithUnknownClientEncryptionKey() + { + ClientEncryptionIncludedPath unknownKeyConfigured = new ClientEncryptionIncludedPath() + { + Path = "/", + ClientEncryptionKeyId = "unknownKey", + EncryptionType = "Deterministic", + EncryptionAlgorithm = "AEAD_AES_256_CBC_HMAC_SHA256", + }; + + Collection paths = new Collection { unknownKeyConfigured }; + ClientEncryptionPolicy clientEncryptionPolicyId = new ClientEncryptionPolicy(paths); + + ContainerProperties containerProperties = new ContainerProperties(Guid.NewGuid().ToString(), "/PK") { ClientEncryptionPolicy = clientEncryptionPolicyId }; + + try + { + await this.cosmosDatabase.CreateContainerAsync(containerProperties, 400); + Assert.Fail("Expected container creation should fail since client encryption policy is configured with unknown key."); + } + catch (CosmosException ex) + { + Assert.AreEqual(HttpStatusCode.BadRequest, ex.StatusCode); + Assert.IsTrue(ex.Message.Contains("ClientEncryptionKey with id '[unknownKey]' does not exist.")); + } + } + [TestMethod] public async Task ClientEncryptionPolicyTest() { + DatabaseInlineCore databaseInlineCore = (DatabaseInlineCore)this.cosmosDatabase; + await TestCommon.CreateClientEncryptionKey("dekId1", databaseInlineCore); + await TestCommon.CreateClientEncryptionKey("dekId2", databaseInlineCore); + string containerName = Guid.NewGuid().ToString(); string partitionKeyPath = "/users"; Collection paths = new Collection() @@ -1371,6 +1403,32 @@ public async Task ClientEncryptionPolicyTest() ContainerResponse readResponse = await container.ReadContainerAsync(); Assert.AreEqual(HttpStatusCode.Created, containerResponse.StatusCode); Assert.IsNotNull(readResponse.Resource.ClientEncryptionPolicy); + + // replace without updating CEP should be successful + readResponse.Resource.IndexingPolicy = new Cosmos.IndexingPolicy() + { + IndexingMode = Cosmos.IndexingMode.None, + Automatic = false + }; + + containerResponse = await container.ReplaceContainerAsync(readResponse.Resource); + Assert.AreEqual(HttpStatusCode.OK, containerResponse.StatusCode); + Assert.AreEqual(Cosmos.IndexingMode.None, containerResponse.Resource.IndexingPolicy.IndexingMode); + Assert.IsFalse(containerResponse.Resource.IndexingPolicy.Automatic); + + // update CEP and attempt replace + readResponse.Resource.ClientEncryptionPolicy = null; + try + { + await container.ReplaceContainerAsync(readResponse.Resource); + + Assert.Fail("ReplaceCollection with update to ClientEncryptionPolicy should have failed."); + } + catch (CosmosException ex) + { + Assert.AreEqual(HttpStatusCode.BadRequest, ex.StatusCode); + Assert.IsTrue(ex.Message.Contains("'clientEncryptionPolicy' cannot be changed as part of collection replace operation.")); + } } [TestMethod] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Fluent/ContainerSettingsTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Fluent/ContainerSettingsTests.cs index bfb95b1de7..2094a17ea4 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Fluent/ContainerSettingsTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Fluent/ContainerSettingsTests.cs @@ -34,6 +34,9 @@ public async Task Cleanup() [TestMethod] public async Task ContainerContractTest() { + DatabaseInlineCore databaseInlineCore = (DatabaseInlineCore)this.database; + await TestCommon.CreateClientEncryptionKey("dekId", databaseInlineCore); + ClientEncryptionIncludedPath clientEncryptionIncludedPath1 = new ClientEncryptionIncludedPath() { Path = "/path", @@ -558,7 +561,7 @@ public async Task TimeToLivePropertyPath() containerResponse = await this.database.DefineContainer(containerName, partitionKeyPath) .WithTimeToLivePropertyPath("/creationDate") .CreateAsync(); - Assert.Fail("CreateColleciton with TtlPropertyPath and with no DefaultTimeToLive should have failed."); + Assert.Fail("CreateCollection with TtlPropertyPath and with no DefaultTimeToLive should have failed."); } catch (CosmosException exeption) { @@ -593,12 +596,17 @@ public async Task TimeToLivePropertyPath() [TestMethod] public async Task WithClientEncryptionPolicyTest() { + // create ClientEncryptionKeys + DatabaseInlineCore databaseInlineCore = (DatabaseInlineCore)this.database; + await TestCommon.CreateClientEncryptionKey("dekId1", databaseInlineCore); + await TestCommon.CreateClientEncryptionKey("dekId2", databaseInlineCore); + string containerName = Guid.NewGuid().ToString(); string partitionKeyPath = "/users"; ClientEncryptionIncludedPath path1 = new ClientEncryptionIncludedPath() { Path = "/path1", - ClientEncryptionKeyId = "key1", + ClientEncryptionKeyId = "dekId1", EncryptionType = "Randomized", EncryptionAlgorithm = "AEAD_AES_256_CBC_HMAC_SHA256" }; @@ -606,7 +614,7 @@ public async Task WithClientEncryptionPolicyTest() ClientEncryptionIncludedPath path2 = new ClientEncryptionIncludedPath() { Path = "/path2", - ClientEncryptionKeyId = "key2", + ClientEncryptionKeyId = "dekId2", EncryptionType = "Randomized", EncryptionAlgorithm = "AEAD_AES_256_CBC_HMAC_SHA256", }; @@ -632,6 +640,20 @@ public async Task WithClientEncryptionPolicyTest() ContainerResponse readResponse = await container.ReadContainerAsync(); Assert.AreEqual(HttpStatusCode.Created, containerResponse.StatusCode); Assert.IsNotNull(readResponse.Resource.ClientEncryptionPolicy); + + // update CEP and replace container + readResponse.Resource.ClientEncryptionPolicy = null; + try + { + await container.ReplaceContainerAsync(readResponse.Resource); + + Assert.Fail("ReplaceCollection with update to ClientEncryptionPolicy should have failed."); + } + catch (CosmosException ex) + { + Assert.AreEqual(HttpStatusCode.BadRequest, ex.StatusCode); + Assert.IsTrue(ex.Message.Contains("'clientEncryptionPolicy' cannot be changed as part of collection replace operation.")); + } } [TestMethod] @@ -655,7 +677,7 @@ public async Task WithClientEncryptionPolicyFailureTest() .Attach() .CreateAsync(); - Assert.Fail("CreateColleciton with invalid ClientEncryptionPolicy should have failed."); + Assert.Fail("CreateCollection with invalid ClientEncryptionPolicy should have failed."); } catch (ArgumentNullException ex) { @@ -673,7 +695,7 @@ public async Task WithClientEncryptionPolicyFailureTest() .Attach() .CreateAsync(); - Assert.Fail("CreateColleciton with invalid ClientEncryptionPolicy should have failed."); + Assert.Fail("CreateCollection with invalid ClientEncryptionPolicy should have failed."); } catch (ArgumentException ex) { @@ -691,7 +713,7 @@ public async Task WithClientEncryptionPolicyFailureTest() .Attach() .CreateAsync(); - Assert.Fail("CreateColleciton with invalid ClientEncryptionPolicy should have failed."); + Assert.Fail("CreateCollection with invalid ClientEncryptionPolicy should have failed."); } catch (ArgumentException ex) { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs index 1dfc3b3a65..7126b21cca 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Utils/TestCommon.cs @@ -12,6 +12,7 @@ namespace Microsoft.Azure.Cosmos.SDK.EmulatorTests using System.Linq; using System.Net; using System.Net.Http; + using System.Security.Cryptography; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -341,6 +342,41 @@ internal static void RouteToTheOnlyPartition(DocumentClient client, DocumentServ request.RouteTo(new PartitionKeyRangeIdentity(collection.ResourceId, ranges.Single().Id)); } + internal static async Task CreateClientEncryptionKey( + string dekId, + DatabaseInlineCore databaseInlineCore) + { + EncryptionKeyWrapMetadata metadata = new EncryptionKeyWrapMetadata("custom", dekId, "tempMetadata"); + + byte[] wrappedDataEncryptionKey = new byte[32]; + // Generate random bytes cryptographically. + using (RNGCryptoServiceProvider rngCsp = new RNGCryptoServiceProvider()) + { + rngCsp.GetBytes(wrappedDataEncryptionKey); + } + + ClientEncryptionKeyProperties clientEncryptionKeyProperties = new ClientEncryptionKeyProperties( + dekId, + "AEAD_AES_256_CBC_HMAC_SHA256", + wrappedDataEncryptionKey, + metadata); + + try + { + await databaseInlineCore.CreateClientEncryptionKeyAsync(clientEncryptionKeyProperties); + } + catch(CosmosException ex) + { + if (ex.StatusCode == HttpStatusCode.Conflict && + ex.Message.Contains("Resource with specified id, name, or unique index already exists.")) + { + return; + } + + throw; + } + } + internal static Database CreateOrGetDatabase(DocumentClient client) { IList databases = TestCommon.ListAll( diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml index a3e7c471b0..4da82692e7 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/BaselineTest/TestBaseline/TraceWriterBaselineTests.ScenariosAsync.xml @@ -111,15 +111,15 @@ │ └── Read Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) ReadFeed-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Read Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) ReadFeed-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Read Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) ReadFeed-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Read Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds └── MoveNextAsync(00000000-0000-0000-0000-000000000000) ReadFeed-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds @@ -1373,7 +1373,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { @@ -1432,7 +1432,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { @@ -1491,7 +1491,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { @@ -1620,7 +1620,7 @@ │ └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── ChangeFeed MoveNextAsync(00000000-0000-0000-0000-000000000000) ChangeFeed-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── ChangeFeed MoveNextAsync(00000000-0000-0000-0000-000000000000) ChangeFeed-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds @@ -1648,7 +1648,7 @@ │ └── [BF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Change Feed Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── [7F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,9F-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds @@ -2083,7 +2083,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { @@ -2493,7 +2493,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { @@ -2735,10 +2735,10 @@ │ └── [BF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Query Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Query Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds - │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds + │ └── [DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds │ └── Query Transport(00000000-0000-0000-0000-000000000000) Transport-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds ├── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds └── MoveNextAsync(00000000-0000-0000-0000-000000000000) Pagination-Component MemberName@FilePath:42 12:00:00:000 0.00 milliseconds @@ -3459,7 +3459,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { @@ -3503,7 +3503,7 @@ "data": {}, "children": [ { - "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,) move next", + "name": "[DF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF,FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF-FF) move next", "id": "00000000-0000-0000-0000-000000000000", "component": "Pagination", "caller info": { diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs index cf425f7ba5..379fdf599f 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/CrossPartitionPartitionRangeEnumeratorTests.cs @@ -6,13 +6,18 @@ namespace Microsoft.Azure.Cosmos.Tests.Pagination { using System; using System.Collections.Generic; + using System.IO; using System.Linq; + using System.Text; + using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos.Pagination; using Microsoft.Azure.Cosmos.Query.Core.Monads; using Microsoft.Azure.Cosmos.ReadFeed.Pagination; + using Microsoft.Azure.Cosmos.Resource.CosmosExceptions; using Microsoft.Azure.Cosmos.Tracing; using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; [TestClass] public sealed class CrossPartitionPartitionRangeEnumeratorTests @@ -20,24 +25,184 @@ public sealed class CrossPartitionPartitionRangeEnumeratorTests [TestMethod] public async Task Test429sAsync() { - Implementation implementation = new Implementation(); + Implementation implementation = new Implementation(false); await implementation.Test429sAsync(); } [TestMethod] public async Task Test429sWithContinuationsAsync() { - Implementation implementation = new Implementation(); + Implementation implementation = new Implementation(false); await implementation.Test429sWithContinuationsAsync(); } [TestMethod] public async Task TestEmptyPages() { - Implementation implementation = new Implementation(); + Implementation implementation = new Implementation(false); await implementation.TestEmptyPages(); } + [TestMethod] + public async Task TestMergeToSinglePartition() + { + Implementation implementation = new Implementation(true); + await implementation.TestMergeToSinglePartition(); + } + + // Validates that on a merge (split with 1 result) we do not create new child enumerators for the merge result + [TestMethod] + public async Task OnMergeRequeueRange() + { + // We expect only creation of enumerators for the original ranges, not any child ranges + List createdEnumerators = new List(); + PartitionRangePageAsyncEnumerator createEnumerator( + FeedRangeState feedRangeState) + { + EnumeratorThatSplits enumerator = new EnumeratorThatSplits(feedRangeState, default, createdEnumerators.Count == 0); + createdEnumerators.Add(enumerator); + return enumerator; + } + + // We expect a request for children and we return the merged range + Mock feedRangeProvider = new Mock(); + feedRangeProvider.Setup(p => p.GetChildRangeAsync( + It.Is(splitRange => ((FeedRangeEpk)splitRange).Range.Min == "" && ((FeedRangeEpk)splitRange).Range.Max == "A"), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new List() { + FeedRangeEpk.FullRange}); + + CrossPartitionRangePageAsyncEnumerator enumerator = new CrossPartitionRangePageAsyncEnumerator( + feedRangeProvider: feedRangeProvider.Object, + createPartitionRangeEnumerator: createEnumerator, + comparer: null, + maxConcurrency: 0, + cancellationToken: default, + state: new CrossFeedRangeState( + new FeedRangeState[] + { + // start with 2 ranges + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("", "A", true, false)), ReadFeedState.Beginning()), + new FeedRangeState(new FeedRangeEpk(new Documents.Routing.Range("A", "FF", true, false)), ReadFeedState.Beginning()) + })); + + // Trigger merge, should requeue and read second enumerator + await enumerator.MoveNextAsync(); + + // Should read first enumerator again + await enumerator.MoveNextAsync(); + + Assert.AreEqual(2, createdEnumerators.Count, "Should only create the original 2 enumerators"); + Assert.AreEqual("", ((FeedRangeEpk)createdEnumerators[0].FeedRangeState.FeedRange).Range.Min); + Assert.AreEqual("A", ((FeedRangeEpk)createdEnumerators[0].FeedRangeState.FeedRange).Range.Max); + Assert.AreEqual("A", ((FeedRangeEpk)createdEnumerators[1].FeedRangeState.FeedRange).Range.Min); + Assert.AreEqual("FF", ((FeedRangeEpk)createdEnumerators[1].FeedRangeState.FeedRange).Range.Max); + + Assert.AreEqual(2, createdEnumerators[0].GetNextPageAsyncCounter, "First enumerator should have been requeued and called again"); + Assert.AreEqual(1, createdEnumerators[1].GetNextPageAsyncCounter, "Second enumerator should be used once"); + } + + // Validates that on a split we create children enumerators and use them + [TestMethod] + public async Task OnSplitQueueNewEnumerators() + { + // We expect creation of the initial full range enumerator and the 2 children + List createdEnumerators = new List(); + PartitionRangePageAsyncEnumerator createEnumerator( + FeedRangeState feedRangeState) + { + EnumeratorThatSplits enumerator = new EnumeratorThatSplits(feedRangeState, default, createdEnumerators.Count == 0); + createdEnumerators.Add(enumerator); + return enumerator; + } + + // We expect a request for children and we return the new children + Mock feedRangeProvider = new Mock(); + feedRangeProvider.Setup(p => p.GetChildRangeAsync( + It.Is(splitRange => ((FeedRangeEpk)splitRange).Range.Min == FeedRangeEpk.FullRange.Range.Min && ((FeedRangeEpk)splitRange).Range.Max == FeedRangeEpk.FullRange.Range.Max), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new List() { + new FeedRangeEpk(new Documents.Routing.Range("", "A", true, false)), + new FeedRangeEpk(new Documents.Routing.Range("A", "FF", true, false))}); + + CrossPartitionRangePageAsyncEnumerator enumerator = new CrossPartitionRangePageAsyncEnumerator( + feedRangeProvider: feedRangeProvider.Object, + createPartitionRangeEnumerator: createEnumerator, + comparer: null, + maxConcurrency: 0, + cancellationToken: default, + state: new CrossFeedRangeState( + new FeedRangeState[] + { + // start with 1 range + new FeedRangeState(FeedRangeEpk.FullRange, ReadFeedState.Beginning()) + })); + + // Trigger split, should create children and call first children + await enumerator.MoveNextAsync(); + + // Should read second children + await enumerator.MoveNextAsync(); + + Assert.AreEqual(3, createdEnumerators.Count, "Should have the original enumerator and the children"); + Assert.AreEqual(FeedRangeEpk.FullRange.Range.Min, ((FeedRangeEpk)createdEnumerators[0].FeedRangeState.FeedRange).Range.Min); + Assert.AreEqual(FeedRangeEpk.FullRange.Range.Max, ((FeedRangeEpk)createdEnumerators[0].FeedRangeState.FeedRange).Range.Max); + Assert.AreEqual("", ((FeedRangeEpk)createdEnumerators[1].FeedRangeState.FeedRange).Range.Min); + Assert.AreEqual("A", ((FeedRangeEpk)createdEnumerators[1].FeedRangeState.FeedRange).Range.Max); + Assert.AreEqual("A", ((FeedRangeEpk)createdEnumerators[2].FeedRangeState.FeedRange).Range.Min); + Assert.AreEqual("FF", ((FeedRangeEpk)createdEnumerators[2].FeedRangeState.FeedRange).Range.Max); + + Assert.AreEqual(1, createdEnumerators[0].GetNextPageAsyncCounter, "First enumerator should have been called once"); + Assert.AreEqual(1, createdEnumerators[1].GetNextPageAsyncCounter, "Second enumerator should have been called once"); + Assert.AreEqual(1, createdEnumerators[2].GetNextPageAsyncCounter, "Second enumerator should not be used"); + } + + private class EnumeratorThatSplits : PartitionRangePageAsyncEnumerator + { + private readonly bool throwError; + + public EnumeratorThatSplits( + FeedRangeState feedRangeState, + CancellationToken cancellationToken, + bool throwError = true) + : base(feedRangeState, cancellationToken) + { + this.throwError = throwError; + } + + public override ValueTask DisposeAsync() + { + throw new NotImplementedException(); + } + + public int GetNextPageAsyncCounter { get; private set; } + + protected override Task> GetNextPageAsync(ITrace trace, CancellationToken cancellationToken) + { + this.GetNextPageAsyncCounter++; + + if (this.GetNextPageAsyncCounter == 1 + && this.throwError) + { + CosmosException splitError = new CosmosException("merge", System.Net.HttpStatusCode.Gone, (int)Documents.SubStatusCodes.PartitionKeyRangeGone, string.Empty, 0); + TryCatch state = TryCatch.FromException(splitError); + return Task.FromResult(state); + } + else + { + return Task.FromResult(TryCatch.FromResult( + new ReadFeedPage( + new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}")), + requestCharge: 1, + activityId: Guid.NewGuid().ToString(), + additionalHeaders: null, + state: ReadFeedState.Beginning()))); + } + } + } + [TestMethod] [DataRow(false, false, false, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: false")] [DataRow(false, false, true, DisplayName = "Use State: false, Allow Splits: false, Allow Merges: true")] @@ -49,18 +214,90 @@ public async Task TestEmptyPages() [DataRow(true, true, true, DisplayName = "Use State: true, Allow Splits: true, Allow Merges: true")] public async Task TestSplitAndMergeAsync(bool useState, bool allowSplits, bool allowMerges) { - Implementation implementation = new Implementation(); + Implementation implementation = new Implementation(singlePartition: false); await implementation.TestSplitAndMergeImplementationAsync(useState, allowSplits, allowMerges); } private sealed class Implementation : PartitionRangeEnumeratorTests, CrossFeedRangeState> { - public Implementation() - : base(singlePartition: false) + enum TriState { NotReady, Ready, Done }; + + public Implementation(bool singlePartition) + : base(singlePartition) + { + this.ShouldMerge = TriState.NotReady; + } + + private TriState ShouldMerge { get; set; } + + private IDocumentContainer DocumentContainer { get; set; } + + private async Task ShouldReturnFailure() { + if (this.ShouldMerge == TriState.Ready) + { + await this.DocumentContainer.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default); + List ranges = await this.DocumentContainer.GetFeedRangesAsync( + trace: NoOpTrace.Singleton, + cancellationToken: default); + + await this.DocumentContainer.MergeAsync(ranges[0], ranges[1], default); + await this.DocumentContainer.RefreshProviderAsync(NoOpTrace.Singleton, default); + this.ShouldMerge = TriState.Done; + + return new CosmosException( + message: "PKRange was split/merged", + statusCode: System.Net.HttpStatusCode.Gone, + subStatusCode: (int)Documents.SubStatusCodes.PartitionKeyRangeGone, + activityId: "BC0CCDA5-D378-4922-B8B0-D51D745B9139", + requestCharge: 0.0); + } + else + { + return null; + } + } + + public async Task TestMergeToSinglePartition() + { + int numItems = 1000; + FlakyDocumentContainer.FailureConfigs config = new FlakyDocumentContainer.FailureConfigs( + inject429s: false, + injectEmptyPages: false, + shouldReturnFailure: this.ShouldReturnFailure); + + this.DocumentContainer = await this.CreateDocumentContainerAsync(numItems: numItems, failureConfigs: config); + + await this.DocumentContainer.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default); + List ranges = await this.DocumentContainer.GetFeedRangesAsync( + trace: NoOpTrace.Singleton, + cancellationToken: default); + await this.DocumentContainer.SplitAsync(ranges.First(), cancellationToken: default); + + IAsyncEnumerator>> enumerator = this.CreateEnumerator(this.DocumentContainer); + List identifiers = new List(); + int iteration = 0; + while (await enumerator.MoveNextAsync()) + { + TryCatch> tryGetPage = enumerator.Current; + tryGetPage.ThrowIfFailed(); + + IReadOnlyList records = this.GetRecordsFromPage(tryGetPage.Result); + foreach (Record record in records) + { + identifiers.Add(record.Payload["pk"].ToString()); + } + + ++iteration; + if (iteration == 1) + { + this.ShouldMerge = TriState.Ready; + } + } + + Assert.AreEqual(numItems, identifiers.Count); } - [TestMethod] public async Task TestSplitAndMergeImplementationAsync(bool useState, bool allowSplits, bool allowMerges) { int numItems = 1000; diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/FlakyDocumentContainer.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/FlakyDocumentContainer.cs index fcdc940dd7..cc038ff4ed 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/FlakyDocumentContainer.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/FlakyDocumentContainer.cs @@ -101,7 +101,7 @@ public Task> MonadicReadItemAsync( cancellationToken); } - public Task> MonadicReadFeedAsync( + public async Task> MonadicReadFeedAsync( FeedRangeState feedRangeState, ReadFeedPaginationOptions readFeedPaginationOptions, ITrace trace, @@ -115,31 +115,36 @@ public Task> MonadicReadFeedAsync( if (this.ShouldReturn429()) { - return Throttle.ForReadFeed; + return await Throttle.ForReadFeed; } if (this.ShouldReturnEmptyPage()) { // We can't return a null continuation, since that signals the query has ended. ReadFeedState nonNullState = readFeedState ?? ReadFeedNotStartedState; - return Task.FromResult( - TryCatch.FromResult( + return TryCatch.FromResult( new ReadFeedPage( new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}")), requestCharge: 42, activityId: Guid.NewGuid().ToString(), additionalHeaders: null, - state: nonNullState))); + state: nonNullState)); } - return this.documentContainer.MonadicReadFeedAsync( + Exception failure = await this.ShouldReturnFailure(); + if (failure != null) + { + return TryCatch.FromException(failure); + } + + return await this.documentContainer.MonadicReadFeedAsync( feedRangeState, readFeedPaginationOptions, trace, cancellationToken); } - public Task> MonadicQueryAsync( + public async Task> MonadicQueryAsync( SqlQuerySpec sqlQuerySpec, FeedRangeState feedRangeState, QueryPaginationOptions queryPaginationOptions, @@ -153,13 +158,12 @@ public Task> MonadicQueryAsync( if (this.ShouldReturn429()) { - return Throttle.ForQuery; + return await Throttle.ForQuery; } if (this.ShouldReturnEmptyPage()) { - return Task.FromResult( - TryCatch.FromResult( + return TryCatch.FromResult( new QueryPage( documents: new List(), requestCharge: 42, @@ -168,10 +172,16 @@ public Task> MonadicQueryAsync( cosmosQueryExecutionInfo: default, disallowContinuationTokenMessage: default, additionalHeaders: default, - state: feedRangeState.State ?? StateForStartedButNoDocumentsReturned))); + state: feedRangeState.State ?? StateForStartedButNoDocumentsReturned)); } - return this.documentContainer.MonadicQueryAsync( + Exception failure = await this.ShouldReturnFailure(); + if (failure != null) + { + return TryCatch.FromException(failure); + } + + return await this.documentContainer.MonadicQueryAsync( sqlQuerySpec, feedRangeState, queryPaginationOptions, @@ -179,7 +189,7 @@ public Task> MonadicQueryAsync( cancellationToken); } - public Task> MonadicChangeFeedAsync( + public async Task> MonadicChangeFeedAsync( FeedRangeState feedRangeState, ChangeFeedPaginationOptions changeFeedPaginationOptions, ITrace trace, @@ -187,22 +197,32 @@ public Task> MonadicChangeFeedAsync( { if (this.ShouldReturn429()) { - return Throttle.ForChangeFeed; + return await Throttle.ForChangeFeed; } if (this.ShouldReturnEmptyPage()) { - return Task.FromResult( - TryCatch.FromResult( + return TryCatch.FromResult( new ChangeFeedSuccessPage( content: new MemoryStream(Encoding.UTF8.GetBytes("{\"Documents\": [], \"_count\": 0, \"_rid\": \"asdf\"}")), requestCharge: 42, activityId: Guid.NewGuid().ToString(), additionalHeaders: default, - state: feedRangeState.State))); + state: feedRangeState.State)); } - return this.documentContainer.MonadicChangeFeedAsync( + if (this.ShouldThrowException(out Exception exception)) + { + throw exception; + } + + Exception failure = await this.ShouldReturnFailure(); + if (failure != null) + { + return TryCatch.FromException(failure); + } + + return await this.documentContainer.MonadicChangeFeedAsync( feedRangeState, changeFeedPaginationOptions, trace, @@ -253,17 +273,50 @@ private bool ShouldReturnEmptyPage() => (this.failureConfigs != null) && this.failureConfigs.InjectEmptyPages && ((this.random.Next() % 2) == 0); + private bool ShouldThrowException(out Exception exception) + { + exception = this.failureConfigs.ThrowException; + return this.failureConfigs != null && this.failureConfigs.ThrowException != null; + } + + private Task ShouldReturnFailure() + { + return this.failureConfigs == null ? Task.FromResult(null) : this.failureConfigs.ReturnFailure(); + } + public sealed class FailureConfigs { - public FailureConfigs(bool inject429s, bool injectEmptyPages) + public delegate Task ShouldReturnFailure(); + + public FailureConfigs( + bool inject429s, + bool injectEmptyPages, + Exception throwException = null, + Exception returnFailure = null) { this.Inject429s = inject429s; this.InjectEmptyPages = injectEmptyPages; + this.ThrowException = throwException; + this.ReturnFailure = () => Task.FromResult(returnFailure); + } + + public FailureConfigs( + bool inject429s, + bool injectEmptyPages, + ShouldReturnFailure shouldReturnFailure) + { + this.Inject429s = inject429s; + this.InjectEmptyPages = injectEmptyPages; + this.ReturnFailure = shouldReturnFailure; } public bool Inject429s { get; } public bool InjectEmptyPages { get; } + + public Exception ThrowException { get; } + + public ShouldReturnFailure ReturnFailure { get; } } } } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs index e78fdb1b31..efc3d8b22d 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Pagination/InMemoryContainer.cs @@ -47,7 +47,7 @@ public InMemoryContainer( PartitionKeyDefinition partitionKeyDefinition) { this.partitionKeyDefinition = partitionKeyDefinition ?? throw new ArgumentNullException(nameof(partitionKeyDefinition)); - PartitionKeyHashRange fullRange = new PartitionKeyHashRange(startInclusive: null, endExclusive: null); + PartitionKeyHashRange fullRange = new PartitionKeyHashRange(startInclusive: null, endExclusive: new PartitionKeyHash(Cosmos.UInt128.MaxValue)); PartitionKeyHashRanges partitionKeyHashRanges = PartitionKeyHashRanges.Create(new PartitionKeyHashRange[] { fullRange }); this.partitionedRecords = new PartitionKeyHashRangeDictionary(partitionKeyHashRanges); this.partitionedRecords[fullRange] = new Records(); @@ -1278,7 +1278,7 @@ private TryCatch MonadicGetPartitionKeyRangeIdFromFeedRange(FeedRange feedR private static PartitionKeyHashRange FeedRangeEpkToHashRange(FeedRangeEpk feedRangeEpk) { PartitionKeyHash? start = feedRangeEpk.Range.Min == string.Empty ? (PartitionKeyHash?)null : PartitionKeyHash.Parse(feedRangeEpk.Range.Min); - PartitionKeyHash? end = feedRangeEpk.Range.Max == string.Empty ? (PartitionKeyHash?)null : PartitionKeyHash.Parse(feedRangeEpk.Range.Max); + PartitionKeyHash? end = feedRangeEpk.Range.Max == string.Empty || feedRangeEpk.Range.Max == "FF" ? (PartitionKeyHash?)null : PartitionKeyHash.Parse(feedRangeEpk.Range.Max); PartitionKeyHashRange hashRange = new PartitionKeyHashRange(start, end); return hashRange; } diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs index e46344df8d..f1528df23a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/Query/Pipeline/FullPipelineTests.cs @@ -58,6 +58,46 @@ public class FullPipelineTests Version = PartitionKeyDefinitionVersion.V2, }; + [TestMethod] + public async Task TestMerge() + { + List documents = Enumerable + .Range(0, 100) + .Select(x => CosmosObject.Parse($"{{\"pk\" : {x} }}")) + .ToList(); + + MergeTestUtil mergeTest = new MergeTestUtil(); + mergeTest.DocumentContainer = await CreateDocumentContainerAsync( + documents: documents, + numPartitions: 2, + failureConfigs: new FlakyDocumentContainer.FailureConfigs( + inject429s: false, + injectEmptyPages: false, + shouldReturnFailure: mergeTest.ShouldReturnFailure)); + + string query = "SELECT * FROM c ORDER BY c._ts"; + int pageSize = 10; + IQueryPipelineStage pipelineStage = CreatePipeline(mergeTest.DocumentContainer, query, pageSize); + + List elements = new List(); + int iteration = 0; + while (await pipelineStage.MoveNextAsync()) + { + TryCatch tryGetQueryPage = pipelineStage.Current; + tryGetQueryPage.ThrowIfFailed(); + + elements.AddRange(tryGetQueryPage.Result.Documents); + ++iteration; + + if (iteration == 1) + { + mergeTest.ShouldMerge = MergeTestUtil.TriState.Ready; + } + } + + Assert.AreEqual(expected: documents.Count, actual: elements.Count); + } + [TestMethod] public async Task SelectStar() { @@ -231,9 +271,13 @@ public async Task Tracing() private static async Task> ExecuteQueryAsync( string query, IReadOnlyList documents, + IDocumentContainer documentContainer = null, int pageSize = 10) { - IDocumentContainer documentContainer = await CreateDocumentContainerAsync(documents); + if (documentContainer == null) + { + documentContainer = await CreateDocumentContainerAsync(documents); + } List resultsFromDrainWithoutState = await DrainWithoutStateAsync(query, documentContainer, pageSize); List resultsFromDrainWithState = await DrainWithStateAsync(query, documentContainer, pageSize); @@ -303,6 +347,7 @@ private static async Task> DrainWithStateAsync(string query, private static async Task CreateDocumentContainerAsync( IReadOnlyList documents, + int numPartitions = 3, FlakyDocumentContainer.FailureConfigs failureConfigs = null) { IMonadicDocumentContainer monadicDocumentContainer = new InMemoryContainer(partitionKeyDefinition); @@ -313,7 +358,7 @@ private static async Task CreateDocumentContainerAsync( DocumentContainer documentContainer = new DocumentContainer(monadicDocumentContainer); - for (int i = 0; i < 3; i++) + for (int i = 0; i < numPartitions; i++) { IReadOnlyList ranges = await documentContainer.GetFeedRangesAsync( trace: NoOpTrace.Singleton, @@ -352,7 +397,7 @@ private static IQueryPipelineStage CreatePipeline(IDocumentContainer documentCon documentContainer.GetFeedRangesAsync(NoOpTrace.Singleton, cancellationToken: default).Result, partitionKey: null, GetQueryPlan(query), - queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: 10), + queryPaginationOptions: new QueryPaginationOptions(pageSizeHint: pageSize), maxConcurrency: 10, requestCancellationToken: default, requestContinuationToken: state); @@ -375,5 +420,39 @@ private static QueryInfo GetQueryPlan(string query) info.ThrowIfFailed(); return info.Result.QueryInfo; } + + private class MergeTestUtil + { + public enum TriState { NotReady, Ready, Done }; + + public IDocumentContainer DocumentContainer { get; set; } + + public TriState ShouldMerge { get; set; } + + public async Task ShouldReturnFailure() + { + if (this.ShouldMerge == TriState.Ready) + { + await this.DocumentContainer.RefreshProviderAsync(NoOpTrace.Singleton, cancellationToken: default); + List ranges = await this.DocumentContainer.GetFeedRangesAsync( + trace: NoOpTrace.Singleton, + cancellationToken: default); + + await this.DocumentContainer.MergeAsync(ranges[0], ranges[1], default); + this.ShouldMerge = TriState.Done; + + return new CosmosException( + message: "PKRange was split/merged", + statusCode: System.Net.HttpStatusCode.Gone, + subStatusCode: (int)Documents.SubStatusCodes.PartitionKeyRangeGone, + activityId: "BC0CCDA5-D378-4922-B8B0-D51D745B9139", + requestCharge: 0.0); + } + else + { + return null; + } + } + } } }