From b21e8a8e091aaa508787f0986672881edc2bb912 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Sat, 23 Sep 2023 09:26:31 -0700 Subject: [PATCH] alwaysUsePagedFluxMaxItemCount (#36847) * alwaysUsePagedFluxMaxItemCount --------- Co-authored-by: annie-mac --- .../CosmosEncryptionAsyncContainer.java | 280 ++++++++++++------ .../CosmosEncryptionQueryTransformer.java | 84 ++++++ .../cosmos/rx/CosmosReadAllItemsTests.java | 77 +++++ .../cosmos/rx/OrderbyDocumentQueryTest.java | 8 +- .../cosmos/rx/ParallelDocumentQueryTest.java | 6 +- .../rx/SinglePartitionDocumentQueryTest.java | 2 +- .../com/azure/cosmos/rx/TopQueryTests.java | 16 +- sdk/cosmos/azure-cosmos/CHANGELOG.md | 2 + .../azure/cosmos/CosmosAsyncContainer.java | 26 +- .../azure/cosmos/CosmosBridgeInternal.java | 26 -- .../ImplementationBridgeHelpers.java | 14 +- .../azure/cosmos/implementation/Utils.java | 13 +- .../CosmosChangeFeedRequestOptions.java | 16 - 13 files changed, 404 insertions(+), 166 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/implementation/CosmosEncryptionQueryTransformer.java create mode 100644 sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/CosmosReadAllItemsTests.java diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index f6a4c84142f0a..423065fc912ff 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -3,11 +3,11 @@ package com.azure.cosmos.encryption; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosBridgeInternal; import com.azure.cosmos.CosmosException; import com.azure.cosmos.encryption.implementation.Constants; +import com.azure.cosmos.encryption.implementation.CosmosEncryptionQueryTransformer; import com.azure.cosmos.encryption.implementation.CosmosResponseFactory; import com.azure.cosmos.encryption.implementation.EncryptionImplementationBridgeHelpers; import com.azure.cosmos.encryption.implementation.EncryptionProcessor; @@ -62,9 +62,9 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import java.util.stream.Collectors; import static com.azure.cosmos.implementation.Utils.getEffectiveCosmosChangeFeedRequestOptions; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; @@ -792,34 +792,6 @@ private Mono> setByteArrayContent(CosmosItemResponse< ).defaultIfEmpty(rsp); } - private Function>> queryDecryptionTransformer(Class classType, - boolean isChangeFeed, - Function>> func) { - return func.andThen(flux -> - flux.publishOn(encryptionScheduler) - .flatMap( - page -> { - boolean useEtagAsContinuation = isChangeFeed; - boolean isNoChangesResponse = isChangeFeed ? - ModelBridgeInternal.getNoChangesFromFeedResponse(page) - : false; - List> jsonNodeArrayMonoList = - page.getResults().stream().map(jsonNode -> decryptResponseNode(jsonNode)).collect(Collectors.toList()); - return Flux.concat(jsonNodeArrayMonoList).map( - item -> getItemDeserializer().convert(classType, item) - ).collectList().map(itemList -> BridgeInternal.createFeedResponseWithQueryMetrics(itemList, - page.getResponseHeaders(), - BridgeInternal.queryMetricsFromFeedResponse(page), - ModelBridgeInternal.getQueryPlanDiagnosticsContext(page), - useEtagAsContinuation, - isNoChangesResponse, - page.getCosmosDiagnostics()) - ); - } - ) - ); - } - private Mono> readItemHelper(String id, PartitionKey partitionKey, CosmosItemRequestOptions requestOptions, @@ -1027,92 +999,201 @@ private CosmosPagedFlux queryItemsHelper(SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions options, Class classType, boolean isRetry) { - setRequestHeaders(options); - CosmosQueryRequestOptions finalOptions = options; - Flux> tFlux = CosmosBridgeInternal.queryItemsInternal(container, sqlQuerySpec, options, - new Transformer() { - @Override - public Function>> transform(Function>> func) { - return queryDecryptionTransformer(classType, false, func); - } - }).byPage().onErrorResume(exception -> { - if (exception instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) exception; - if (!isRetry && isIncorrectContainerRid(cosmosException)) { - this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false); - return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).thenMany( - (CosmosPagedFlux.defer(() -> queryItemsHelper(sqlQuerySpec,finalOptions, classType, true).byPage()))); - } - } - return Mono.error(exception); - }); - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, finalOptions); - return tFlux; + AtomicBoolean shouldRetry = new AtomicBoolean(!isRetry); + + Transformer transformer = new CosmosEncryptionQueryTransformer( + this.encryptionScheduler, + this.getEncryptionProcessor(), + this.getItemDeserializer(), + classType, + false); + + Flux> result = this.transformQueryItemsInternal( + transformer, + sqlQuerySpec, + options, + pagedFluxOptions + ).apply(pagedFluxOptions); + + return result + .onErrorResume(exception -> { + if (exception instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) exception; + if (shouldRetry.get() && isIncorrectContainerRid(cosmosException)) { + // stale cache, refresh caches and then retry + this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false); + shouldRetry.set(false); + + return this.encryptionProcessor + .initializeEncryptionSettingsAsync(true) + .thenMany( + Flux.defer(() -> { + return this.transformQueryItemsInternal( + transformer, + sqlQuerySpec, + options, + pagedFluxOptions + ).apply(pagedFluxOptions); + }) + ); + } + } + return Mono.error(exception); + }); }); } + private Function>> transformQueryItemsInternal( + Transformer transformer, + SqlQuerySpec sqlQuerySpec, + CosmosQueryRequestOptions queryRequestOptions, + CosmosPagedFluxOptions pagedFluxOptions) { + + CosmosQueryRequestOptions finalOptions = setRequestHeaders(queryRequestOptions); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, finalOptions); + + return transformer.transform( + cosmosAsyncContainerAccessor.queryItemsInternalFunc( + this.container, + sqlQuerySpec, + finalOptions, + JsonNode.class) + ); + } + + private Function>> transformQueryChangeFeedInternal( + Transformer transformer, + CosmosChangeFeedRequestOptions changeFeedRequestOptions, + CosmosPagedFluxOptions pagedFluxOptions) { + + CosmosChangeFeedRequestOptions finalOptions = setRequestHeaders(changeFeedRequestOptions);; + getEffectiveCosmosChangeFeedRequestOptions(pagedFluxOptions, finalOptions); + + return transformer.transform( + cosmosAsyncContainerAccessor + .queryChangeFeedInternalFunc( + this.container, + finalOptions, + JsonNode.class) + ); + } + + private Function>> transformQueryItemsWithMonoSqlQuerySpec( + Transformer transformer, + Mono sqlQuerySpecMono, + CosmosQueryRequestOptions options, + CosmosPagedFluxOptions pagedFluxOptions) { + + CosmosQueryRequestOptions finalOptions = setRequestHeaders(options); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, finalOptions); + + return transformer.transform( + cosmosAsyncContainerAccessor.queryItemsInternalFuncWithMonoSqlQuerySpec( + this.container, + sqlQuerySpecMono, + finalOptions, + JsonNode.class + ) + ); + } + private CosmosPagedFlux queryChangeFeedHelper(CosmosChangeFeedRequestOptions options, Class classType, boolean isRetry) { - setRequestHeaders(options); - CosmosChangeFeedRequestOptions finalOptions = options; - Flux> tFlux = - UtilBridgeInternal.createCosmosPagedFlux(((Transformer) func -> queryDecryptionTransformer(classType, - true, - func)).transform(cosmosAsyncContainerAccessor.queryChangeFeedInternalFunc(this.container, options, - JsonNode.class))).byPage().onErrorResume(exception -> { - if (exception instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) exception; - if (!isRetry && isIncorrectContainerRid(cosmosException)) { - this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false); - return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).thenMany( - (CosmosPagedFlux.defer(() -> queryChangeFeedHelper(finalOptions, classType, true).byPage()))); - } - } - return Mono.error(exception); - }); - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - getEffectiveCosmosChangeFeedRequestOptions(pagedFluxOptions, finalOptions); - return tFlux; + AtomicBoolean shouldRetry = new AtomicBoolean(!isRetry); + + Transformer transformer = new CosmosEncryptionQueryTransformer( + this.encryptionScheduler, + this.getEncryptionProcessor(), + this.getItemDeserializer(), + classType, + true); + + Flux> result = this.transformQueryChangeFeedInternal( + transformer, + options, + pagedFluxOptions + ).apply(pagedFluxOptions); + + return result + .onErrorResume(exception -> { + if (exception instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) exception; + if (shouldRetry.get() && isIncorrectContainerRid(cosmosException)) { + // stale cache, refresh caches and then retry + this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false); + shouldRetry.set(false); + + return this.encryptionProcessor + .initializeEncryptionSettingsAsync(true) + .thenMany( + Flux.defer(() -> { + return this.transformQueryChangeFeedInternal( + transformer, + options, + pagedFluxOptions + ).apply(pagedFluxOptions); + }) + ); + } + } + return Mono.error(exception); + }); }); } - private CosmosPagedFlux queryItemsHelperWithMonoSqlQuerySpec(Mono sqlQuerySpecMono, SqlQuerySpecWithEncryption sqlQuerySpecWithEncryption, CosmosQueryRequestOptions options, Class classType, boolean isRetry) { - setRequestHeaders(options); - CosmosQueryRequestOptions finalOptions = options; - - Flux> tFlux = CosmosBridgeInternal.queryItemsInternal(container, sqlQuerySpecMono, options, - new Transformer() { - @Override - public Function>> transform(Function>> func) { - return queryDecryptionTransformer(classType, false, func); - } - }).byPage().onErrorResume(exception -> { - if (exception instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) exception; - if (!isRetry && isIncorrectContainerRid(cosmosException)) { - this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false); - return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).thenMany( - (CosmosPagedFlux.defer(() -> queryItemsHelper(specWithEncryptionAccessor.getSqlQuerySpec(sqlQuerySpecWithEncryption), finalOptions, classType, true).byPage()))); - } - } - return Mono.error(exception); - }); - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, finalOptions); - return tFlux; + AtomicBoolean shouldRetry = new AtomicBoolean(!isRetry); + + Transformer transformer = new CosmosEncryptionQueryTransformer( + this.encryptionScheduler, + this.getEncryptionProcessor(), + this.getItemDeserializer(), + classType, + false); + + Flux> result = this.transformQueryItemsWithMonoSqlQuerySpec( + transformer, + sqlQuerySpecMono, + options, + pagedFluxOptions + ).apply(pagedFluxOptions); + + return result + .onErrorResume(exception -> { + if (exception instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) exception; + if (shouldRetry.get() && isIncorrectContainerRid(cosmosException)) { + // stale cache, refresh caches and then retry + this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false); + shouldRetry.set(false); + + return this.encryptionProcessor + .initializeEncryptionSettingsAsync(true) + .thenMany( + Flux.defer(() -> { + return this.transformQueryItemsInternal( + transformer, + specWithEncryptionAccessor.getSqlQuerySpec(sqlQuerySpecWithEncryption), + options, + pagedFluxOptions + ).apply(pagedFluxOptions); + }) + ); + } + } + return Mono.error(exception); + }); }); } @@ -1422,14 +1503,17 @@ private void setRequestHeaders(CosmosItemRequestOptions requestOptions) { cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid()); } - private void setRequestHeaders(CosmosQueryRequestOptions requestOptions) { + private CosmosQueryRequestOptions setRequestHeaders(CosmosQueryRequestOptions requestOptions) { cosmosQueryRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true"); cosmosQueryRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid()); + System.out.println("Setting collectionRid header " + this.encryptionProcessor.getContainerRid()); + return requestOptions; } - private void setRequestHeaders(CosmosChangeFeedRequestOptions requestOptions) { + private CosmosChangeFeedRequestOptions setRequestHeaders(CosmosChangeFeedRequestOptions requestOptions) { cosmosChangeFeedRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true"); cosmosChangeFeedRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid()); + return requestOptions; } private void setRequestHeaders(CosmosBatchRequestOptions requestOptions) { diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/implementation/CosmosEncryptionQueryTransformer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/implementation/CosmosEncryptionQueryTransformer.java new file mode 100644 index 0000000000000..c46cb04e00cf2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/implementation/CosmosEncryptionQueryTransformer.java @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.encryption.implementation; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.CosmosPagedFluxOptions; +import com.azure.cosmos.implementation.ItemDeserializer; +import com.azure.cosmos.implementation.query.Transformer; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.fasterxml.jackson.databind.JsonNode; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class CosmosEncryptionQueryTransformer implements Transformer { + private final Scheduler encryptionScheduler; + private final EncryptionProcessor encryptionProcessor; + private final ItemDeserializer itemDeserializer; + private final Class classType; + private final boolean isChangeFeed; + + public CosmosEncryptionQueryTransformer( + Scheduler encryptionScheduler, + EncryptionProcessor encryptionProcessor, + ItemDeserializer itemDeserializer, + Class classType, + Boolean isChangeFeed) { + this.encryptionScheduler = encryptionScheduler; + this.encryptionProcessor = encryptionProcessor; + this.itemDeserializer = itemDeserializer; + this.classType = classType; + this.isChangeFeed = isChangeFeed; + } + + @Override + public Function>> transform(Function>> func) { + return queryDecryptionTransformer(this.classType, this.isChangeFeed, func); + } + + private Function>> queryDecryptionTransformer( + Class classType, + boolean isChangeFeed, + Function>> func) { + return func.andThen(flux -> + flux.publishOn(encryptionScheduler) + .flatMap( + page -> { + boolean useEtagAsContinuation = isChangeFeed; + boolean isNoChangesResponse = isChangeFeed ? + ModelBridgeInternal.getNoChangesFromFeedResponse(page) + : false; + List> jsonNodeArrayMonoList = + page.getResults().stream().map(jsonNode -> decryptResponseNode(jsonNode)).collect(Collectors.toList()); + return Flux.concat(jsonNodeArrayMonoList).map( + item -> this.itemDeserializer.convert(classType, item) + ).collectList().map(itemList -> BridgeInternal.createFeedResponseWithQueryMetrics(itemList, + page.getResponseHeaders(), + BridgeInternal.queryMetricsFromFeedResponse(page), + ModelBridgeInternal.getQueryPlanDiagnosticsContext(page), + useEtagAsContinuation, + isNoChangesResponse, + page.getCosmosDiagnostics()) + ); + } + ) + ); + } + + Mono decryptResponseNode( + JsonNode jsonNode) { + + if (jsonNode == null) { + return Mono.empty(); + } + + return this.encryptionProcessor.decryptJsonNode( + jsonNode); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/CosmosReadAllItemsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/CosmosReadAllItemsTests.java new file mode 100644 index 0000000000000..55225bb23dfca --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/CosmosReadAllItemsTests.java @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.rx; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.TestObject; +import com.azure.cosmos.implementation.FeedResponseListValidator; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.util.CosmosPagedFlux; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.UUID; + +public class CosmosReadAllItemsTests extends TestSuiteBase { + private final static int TIMEOUT = 30000; + private CosmosAsyncClient client; + private CosmosAsyncContainer container; + + @Factory(dataProvider = "clientBuildersWithSessionConsistency") + public CosmosReadAllItemsTests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + @Test(groups = { "query" }, timeOut = 2 * TIMEOUT) + public void readMany_UsePageSizeInPagedFluxOption() { + + // first creating few items + String pkValue = UUID.randomUUID().toString(); + int itemCount = 10; + for (int i = 0; i < itemCount; i++) { + TestObject testObject = TestObject.create(pkValue); + this.container.createItem(testObject).block(); + } + + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + + FeedResponseListValidator validator1 = + new FeedResponseListValidator + .Builder() + .totalSize(itemCount) + .numberOfPages(2) + .build(); + CosmosPagedFlux queryObservable1 = + this.container.readAllItems(new PartitionKey(pkValue), cosmosQueryRequestOptions, TestObject.class); + + validateQuerySuccess(queryObservable1.byPage(5), validator1, TIMEOUT); + + CosmosQueryRequestOptions options = new CosmosQueryRequestOptions(); + options.setMaxDegreeOfParallelism(2); + + FeedResponseListValidator validator2 = + new FeedResponseListValidator + .Builder() + .totalSize(itemCount) + .numberOfPages(1) + .build(); + validateQuerySuccess(queryObservable1.byPage(), validator2, TIMEOUT); + } + + @AfterClass(groups = { "query" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() { + safeClose(client); + } + + @BeforeClass(groups = { "query" }, timeOut = SETUP_TIMEOUT) + public void before_TopQueryTests() { + this.client = getClientBuilder().buildAsyncClient(); + this.container = getSharedSinglePartitionCosmosContainer(client); + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java index 06c5599eb457c..fb9946dee6f5a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/OrderbyDocumentQueryTest.java @@ -445,8 +445,9 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc options.setPartitionKey(new PartitionKey("duplicatePartitionKeyValue")); CosmosPagedFlux queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class); + int preferredPageSize = 3; TestSubscriber> subscriber = new TestSubscriber<>(); - queryObservable.byPage(3).take(1).subscribe(subscriber); + queryObservable.byPage(preferredPageSize).take(1).subscribe(subscriber); subscriber.awaitTerminalEvent(); subscriber.assertComplete(); @@ -464,8 +465,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc List expectedDocs = createdDocuments.stream() .filter(d -> (StringUtils.equals("duplicatePartitionKeyValue", ModelBridgeInternal.getStringFromJsonSerializable(d,"mypk")))) .filter(d -> (ModelBridgeInternal.getIntFromJsonSerializable(d,"propScopedPartitionInt") > 2)).collect(Collectors.toList()); - Integer maxItemCount = ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(options); - int expectedPageSize = (expectedDocs.size() + maxItemCount - 1) / maxItemCount; + int expectedPageSize = (expectedDocs.size() + preferredPageSize - 1) / preferredPageSize; assertThat(expectedDocs).hasSize(10 - 3); @@ -481,7 +481,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc .requestChargeGreaterThanOrEqualTo(1.0).build()) .build(); - validateQuerySuccess(queryObservable.byPage(page.getContinuationToken()), validator); + validateQuerySuccess(queryObservable.byPage(page.getContinuationToken(), preferredPageSize), validator); } @Test(groups = { "query" }, timeOut = TIMEOUT) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java index d35c9d0a651f3..dac5caa05a339 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/ParallelDocumentQueryTest.java @@ -115,12 +115,14 @@ public void queryMetricEquality() throws Exception { options.setQueryMetricsEnabled(true); options.setMaxDegreeOfParallelism(0); + int preferredPageSize = 5; + CosmosPagedFlux queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class); - List> resultList1 = queryObservable.byPage(5).collectList().block(); + List> resultList1 = queryObservable.byPage(preferredPageSize).collectList().block(); options.setMaxDegreeOfParallelism(4); CosmosPagedFlux threadedQueryObs = createdCollection.queryItems(query, options, InternalObjectNode.class); - List> resultList2 = threadedQueryObs.byPage().collectList().block(); + List> resultList2 = threadedQueryObs.byPage(preferredPageSize).collectList().block(); assertThat(resultList1.size()).isEqualTo(resultList2.size()); for(int i = 0; i < resultList1.size(); i++){ diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java index 48bc2641609f7..432407f8e43ef 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/SinglePartitionDocumentQueryTest.java @@ -281,7 +281,7 @@ public void continuationToken() throws Exception { .allPagesSatisfy(new FeedResponseValidator.Builder() .requestChargeGreaterThanOrEqualTo(1.0).build()) .build(); - validateQuerySuccess(queryObservable.byPage(page.getContinuationToken()), validator); + validateQuerySuccess(queryObservable.byPage(page.getContinuationToken(), maxItemCount), validator); } @Test(groups = { "query" }, timeOut = TIMEOUT) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java index 11d78c17af4b4..c48b0f7d6fcce 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TopQueryTests.java @@ -58,6 +58,7 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception { int expectedTotalSize = 20; int expectedNumberOfPages = 3; + int pageSize = 9; int[] expectedPageLengths = new int[] { 9, 9, 2 }; for (int i = 0; i < 2; i++) { @@ -81,11 +82,24 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception { CosmosPagedFlux queryObservable3 = createdCollection.queryItems("SELECT TOP 20 * from c", options, InternalObjectNode.class); + // validate the pageSize in byPage() will be honored FeedResponseListValidator validator3 = new FeedResponseListValidator.Builder() .totalSize(expectedTotalSize).numberOfPages(expectedNumberOfPages).pageLengths(expectedPageLengths) .hasValidQueryMetrics(qmEnabled).build(); - validateQuerySuccess(queryObservable3.byPage(), validator3, TIMEOUT); + validateQuerySuccess(queryObservable3.byPage(pageSize), validator3, TIMEOUT); + + // validate default value will be used for byPage + FeedResponseListValidator validator4 = + new FeedResponseListValidator + .Builder() + .totalSize(expectedTotalSize) + .numberOfPages(1) + .pageLengths(new int[] { expectedTotalSize }) + .hasValidQueryMetrics(qmEnabled) + .build(); + + validateQuerySuccess(queryObservable3.byPage(), validator4, TIMEOUT); if (i == 0) { options.setPartitionKey(new PartitionKey(firstPk)); diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 46b702c519125..0eae5196a1f13 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -10,6 +10,8 @@ #### Bugs Fixed * Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599). +* Fixed an issue where `pageSize` from `byPage` is not always being honored. This only happens when the same `CosmosQueryRequestOptions` being used through different + requests, and different pageSize being used. See [PR 36847](https://github.com/Azure/azure-sdk-for-java/pull/36847) #### Other Changes * Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index ee8efd6399cc1..d3babd2360fc7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -652,8 +652,6 @@ CosmosPagedFlux readAllItems(CosmosQueryRequestOptions options, Class queryOptionsAccessor.withEmptyPageDiagnosticsEnabled(nonNullOptions, true) : nonNullOptions; - queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions); - pagedFluxOptions.setTracerAndTelemetryInformation( this.readAllItemsSpanName, database.getId(), @@ -956,8 +954,6 @@ Function>> queryItemsInternalFu : nonNullOptions; String spanName = this.queryItemsSpanName; - queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions); - pagedFluxOptions.setTracerAndTelemetryInformation( spanName, database.getId(), @@ -994,7 +990,6 @@ Function>> queryItemsInternalFu queryOptionsAccessor.withEmptyPageDiagnosticsEnabled(nonNullOptions, true) : nonNullOptions; String spanName = this.queryItemsSpanName; - queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions); pagedFluxOptions.setTracerAndTelemetryInformation( spanName, database.getId(), @@ -1084,7 +1079,6 @@ Function>> queryChangeFeedInter CosmosAsyncClient client = this.getDatabase().getClient(); String spanName = this.queryChangeFeedSpanName; - cfOptionsAccessor.applyMaxItemCount(cosmosChangeFeedRequestOptions, pagedFluxOptions); pagedFluxOptions.setTracerAndTelemetryInformation( spanName, database.getId(), @@ -1555,7 +1549,6 @@ public CosmosPagedFlux readAllItems( requestOptions.setPartitionKey(partitionKey); return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions); pagedFluxOptions.setTracerAndTelemetryInformation( this.readAllItemsOfLogicalPartitionSpanName, database.getId(), @@ -2709,6 +2702,25 @@ public Mono> readMany( return cosmosAsyncContainer.readMany(itemIdentityList, requestOptions, classType); } + + @Override + public Function>> queryItemsInternalFunc( + CosmosAsyncContainer cosmosAsyncContainer, + SqlQuerySpec sqlQuerySpec, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + Class classType) { + + return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpec, cosmosQueryRequestOptions, classType); + } + + @Override + public Function>> queryItemsInternalFuncWithMonoSqlQuerySpec( + CosmosAsyncContainer cosmosAsyncContainer, + Mono sqlQuerySpecMono, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + Class classType) { + return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpecMono, cosmosQueryRequestOptions, classType); + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosBridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosBridgeInternal.java index c417e81dfdd87..df52de1060500 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosBridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosBridgeInternal.java @@ -5,17 +5,13 @@ import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.ConnectionPolicy; -import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Warning; import com.azure.cosmos.implementation.query.Transformer; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; -import com.azure.cosmos.models.CosmosQueryRequestOptions; -import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import com.fasterxml.jackson.databind.JsonNode; -import reactor.core.publisher.Mono; import static com.azure.cosmos.implementation.Warning.INTERNAL_USE_ONLY_WARNING; @@ -133,28 +129,6 @@ public static CosmosException cosmosException(int statusCode, Exception innerExc return new CosmosException(statusCode, innerException); } - @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosPagedFlux queryItemsInternal(CosmosAsyncContainer container, - SqlQuerySpec sqlQuerySpec, - CosmosQueryRequestOptions cosmosQueryRequestOptions, - Transformer transformer) { - return UtilBridgeInternal.createCosmosPagedFlux(transformer.transform(container.queryItemsInternalFunc( - sqlQuerySpec, - cosmosQueryRequestOptions, - JsonNode.class))); - } - - @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosPagedFlux queryItemsInternal(CosmosAsyncContainer container, - Mono sqlQuerySpecMono, - CosmosQueryRequestOptions cosmosQueryRequestOptions, - Transformer transformer) { - return UtilBridgeInternal.createCosmosPagedFlux(transformer.transform(container.queryItemsInternalFunc( - sqlQuerySpecMono, - cosmosQueryRequestOptions, - JsonNode.class))); - } - @Warning(value = INTERNAL_USE_ONLY_WARNING) public static CosmosPagedFlux queryChangeFeedInternal( CosmosAsyncContainer container, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index d242fc75e3cfe..e32c975f120a5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -60,6 +60,7 @@ import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PriorityLevel; +import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import com.fasterxml.jackson.databind.JsonNode; @@ -321,7 +322,6 @@ public interface CosmosChangeFeedRequestOptionsAccessor { Function getItemFactoryMethod(CosmosChangeFeedRequestOptions queryRequestOptions, Class classOfT); CosmosChangeFeedRequestOptions setItemFactoryMethod(CosmosChangeFeedRequestOptions queryRequestOptions, Function factoryMethod); CosmosDiagnosticsThresholds getDiagnosticsThresholds(CosmosChangeFeedRequestOptions options); - void applyMaxItemCount(CosmosChangeFeedRequestOptions requestOptions, CosmosPagedFluxOptions fluxOptions); List getExcludeRegions(CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions); } @@ -900,6 +900,18 @@ Mono> readMany( List itemIdentityList, CosmosQueryRequestOptions requestOptions, Class classType); + + Function>> queryItemsInternalFunc( + CosmosAsyncContainer cosmosAsyncContainer, + SqlQuerySpec sqlQuerySpec, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + Class classType); + + Function>> queryItemsInternalFuncWithMonoSqlQuerySpec( + CosmosAsyncContainer cosmosAsyncContainer, + Mono sqlQuerySpecMono, + CosmosQueryRequestOptions cosmosQueryRequestOptions, + Class classType); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java index 6c553a22649dd..4a5cd54e6aacb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Utils.java @@ -589,17 +589,10 @@ public static void setContinuationTokenAndMaxItemCount(CosmosPagedFluxOptions pa if (pagedFluxOptions.getMaxItemCount() != null) { ModelBridgeInternal.setQueryRequestOptionsMaxItemCount(cosmosQueryRequestOptions, pagedFluxOptions.getMaxItemCount()); } else { - ImplementationBridgeHelpers - .CosmosQueryRequestOptionsHelper - .getCosmosQueryRequestOptionsAccessor() - .applyMaxItemCount(cosmosQueryRequestOptions, pagedFluxOptions); - // if query request options also don't have maxItemCount set, apply defaults - if (pagedFluxOptions.getMaxItemCount() == null) { - ModelBridgeInternal.setQueryRequestOptionsMaxItemCount( - cosmosQueryRequestOptions, Constants.Properties.DEFAULT_MAX_PAGE_SIZE); - pagedFluxOptions.setMaxItemCount(Constants.Properties.DEFAULT_MAX_PAGE_SIZE); - } + ModelBridgeInternal.setQueryRequestOptionsMaxItemCount( + cosmosQueryRequestOptions, Constants.Properties.DEFAULT_MAX_PAGE_SIZE); + pagedFluxOptions.setMaxItemCount(Constants.Properties.DEFAULT_MAX_PAGE_SIZE); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java index b8049fab1fc7b..90b09a6748489 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java @@ -595,22 +595,6 @@ public CosmosDiagnosticsThresholds getDiagnosticsThresholds(CosmosChangeFeedRequ return options.thresholds; } - @Override - public void applyMaxItemCount( - CosmosChangeFeedRequestOptions requestOptions, - CosmosPagedFluxOptions fluxOptions) { - - if (requestOptions == null || fluxOptions == null) { - return; - } - - if (fluxOptions.getMaxItemCount() != null) { - return; - } - - fluxOptions.setMaxItemCount(requestOptions.getMaxItemCount()); - } - @Override public List getExcludeRegions(CosmosChangeFeedRequestOptions cosmosChangeFeedRequestOptions) { return cosmosChangeFeedRequestOptions.excludeRegions;