Skip to content

Commit

Permalink
alwaysUsePagedFluxMaxItemCount (Azure#36847)
Browse files Browse the repository at this point in the history
* alwaysUsePagedFluxMaxItemCount

---------

Co-authored-by: annie-mac <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and annie-mac authored Sep 23, 2023
1 parent 409a5ac commit b21e8a8
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 166 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.encryption.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
import com.azure.cosmos.implementation.ItemDeserializer;
import com.azure.cosmos.implementation.query.Transformer;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CosmosEncryptionQueryTransformer<T> implements Transformer<T> {
private final Scheduler encryptionScheduler;
private final EncryptionProcessor encryptionProcessor;
private final ItemDeserializer itemDeserializer;
private final Class<T> classType;
private final boolean isChangeFeed;

public CosmosEncryptionQueryTransformer(
Scheduler encryptionScheduler,
EncryptionProcessor encryptionProcessor,
ItemDeserializer itemDeserializer,
Class<T> classType,
Boolean isChangeFeed) {
this.encryptionScheduler = encryptionScheduler;
this.encryptionProcessor = encryptionProcessor;
this.itemDeserializer = itemDeserializer;
this.classType = classType;
this.isChangeFeed = isChangeFeed;
}

@Override
public Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> transform(Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
return queryDecryptionTransformer(this.classType, this.isChangeFeed, func);
}

private <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryDecryptionTransformer(
Class<T> classType,
boolean isChangeFeed,
Function<CosmosPagedFluxOptions, Flux<FeedResponse<JsonNode>>> func) {
return func.andThen(flux ->
flux.publishOn(encryptionScheduler)
.flatMap(
page -> {
boolean useEtagAsContinuation = isChangeFeed;
boolean isNoChangesResponse = isChangeFeed ?
ModelBridgeInternal.getNoChangesFromFeedResponse(page)
: false;
List<Mono<JsonNode>> jsonNodeArrayMonoList =
page.getResults().stream().map(jsonNode -> decryptResponseNode(jsonNode)).collect(Collectors.toList());
return Flux.concat(jsonNodeArrayMonoList).map(
item -> this.itemDeserializer.convert(classType, item)
).collectList().map(itemList -> BridgeInternal.createFeedResponseWithQueryMetrics(itemList,
page.getResponseHeaders(),
BridgeInternal.queryMetricsFromFeedResponse(page),
ModelBridgeInternal.getQueryPlanDiagnosticsContext(page),
useEtagAsContinuation,
isNoChangesResponse,
page.getCosmosDiagnostics())
);
}
)
);
}

Mono<JsonNode> decryptResponseNode(
JsonNode jsonNode) {

if (jsonNode == null) {
return Mono.empty();
}

return this.encryptionProcessor.decryptJsonNode(
jsonNode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.rx;

import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosClientBuilder;
import com.azure.cosmos.TestObject;
import com.azure.cosmos.implementation.FeedResponseListValidator;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.CosmosPagedFlux;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;

import java.util.UUID;

public class CosmosReadAllItemsTests extends TestSuiteBase {
private final static int TIMEOUT = 30000;
private CosmosAsyncClient client;
private CosmosAsyncContainer container;

@Factory(dataProvider = "clientBuildersWithSessionConsistency")
public CosmosReadAllItemsTests(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}

@Test(groups = { "query" }, timeOut = 2 * TIMEOUT)
public void readMany_UsePageSizeInPagedFluxOption() {

// first creating few items
String pkValue = UUID.randomUUID().toString();
int itemCount = 10;
for (int i = 0; i < itemCount; i++) {
TestObject testObject = TestObject.create(pkValue);
this.container.createItem(testObject).block();
}

CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();

FeedResponseListValidator<TestObject> validator1 =
new FeedResponseListValidator
.Builder<TestObject>()
.totalSize(itemCount)
.numberOfPages(2)
.build();
CosmosPagedFlux<TestObject> queryObservable1 =
this.container.readAllItems(new PartitionKey(pkValue), cosmosQueryRequestOptions, TestObject.class);

validateQuerySuccess(queryObservable1.byPage(5), validator1, TIMEOUT);

CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();
options.setMaxDegreeOfParallelism(2);

FeedResponseListValidator<TestObject> validator2 =
new FeedResponseListValidator
.Builder<TestObject>()
.totalSize(itemCount)
.numberOfPages(1)
.build();
validateQuerySuccess(queryObservable1.byPage(), validator2, TIMEOUT);
}

@AfterClass(groups = { "query" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
public void afterClass() {
safeClose(client);
}

@BeforeClass(groups = { "query" }, timeOut = SETUP_TIMEOUT)
public void before_TopQueryTests() {
this.client = getClientBuilder().buildAsyncClient();
this.container = getSharedSinglePartitionCosmosContainer(client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,9 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
options.setPartitionKey(new PartitionKey("duplicatePartitionKeyValue"));
CosmosPagedFlux<InternalObjectNode> queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);

int preferredPageSize = 3;
TestSubscriber<FeedResponse<InternalObjectNode>> subscriber = new TestSubscriber<>();
queryObservable.byPage(3).take(1).subscribe(subscriber);
queryObservable.byPage(preferredPageSize).take(1).subscribe(subscriber);

subscriber.awaitTerminalEvent();
subscriber.assertComplete();
Expand All @@ -464,8 +465,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
List<InternalObjectNode> expectedDocs = createdDocuments.stream()
.filter(d -> (StringUtils.equals("duplicatePartitionKeyValue", ModelBridgeInternal.getStringFromJsonSerializable(d,"mypk"))))
.filter(d -> (ModelBridgeInternal.getIntFromJsonSerializable(d,"propScopedPartitionInt") > 2)).collect(Collectors.toList());
Integer maxItemCount = ModelBridgeInternal.getMaxItemCountFromQueryRequestOptions(options);
int expectedPageSize = (expectedDocs.size() + maxItemCount - 1) / maxItemCount;
int expectedPageSize = (expectedDocs.size() + preferredPageSize - 1) / preferredPageSize;

assertThat(expectedDocs).hasSize(10 - 3);

Expand All @@ -481,7 +481,7 @@ public void queryScopedToSinglePartition_StartWithContinuationToken() throws Exc
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();

validateQuerySuccess(queryObservable.byPage(page.getContinuationToken()), validator);
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken(), preferredPageSize), validator);
}

@Test(groups = { "query" }, timeOut = TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ public void queryMetricEquality() throws Exception {
options.setQueryMetricsEnabled(true);
options.setMaxDegreeOfParallelism(0);

int preferredPageSize = 5;

CosmosPagedFlux<InternalObjectNode> queryObservable = createdCollection.queryItems(query, options, InternalObjectNode.class);
List<FeedResponse<InternalObjectNode>> resultList1 = queryObservable.byPage(5).collectList().block();
List<FeedResponse<InternalObjectNode>> resultList1 = queryObservable.byPage(preferredPageSize).collectList().block();

options.setMaxDegreeOfParallelism(4);
CosmosPagedFlux<InternalObjectNode> threadedQueryObs = createdCollection.queryItems(query, options, InternalObjectNode.class);
List<FeedResponse<InternalObjectNode>> resultList2 = threadedQueryObs.byPage().collectList().block();
List<FeedResponse<InternalObjectNode>> resultList2 = threadedQueryObs.byPage(preferredPageSize).collectList().block();

assertThat(resultList1.size()).isEqualTo(resultList2.size());
for(int i = 0; i < resultList1.size(); i++){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void continuationToken() throws Exception {
.allPagesSatisfy(new FeedResponseValidator.Builder<InternalObjectNode>()
.requestChargeGreaterThanOrEqualTo(1.0).build())
.build();
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken()), validator);
validateQuerySuccess(queryObservable.byPage(page.getContinuationToken(), maxItemCount), validator);
}

@Test(groups = { "query" }, timeOut = TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception {

int expectedTotalSize = 20;
int expectedNumberOfPages = 3;
int pageSize = 9;
int[] expectedPageLengths = new int[] { 9, 9, 2 };

for (int i = 0; i < 2; i++) {
Expand All @@ -81,11 +82,24 @@ public void queryDocumentsWithTop(Boolean qmEnabled) throws Exception {

CosmosPagedFlux<InternalObjectNode> queryObservable3 = createdCollection.queryItems("SELECT TOP 20 * from c", options, InternalObjectNode.class);

// validate the pageSize in byPage() will be honored
FeedResponseListValidator<InternalObjectNode> validator3 = new FeedResponseListValidator.Builder<InternalObjectNode>()
.totalSize(expectedTotalSize).numberOfPages(expectedNumberOfPages).pageLengths(expectedPageLengths)
.hasValidQueryMetrics(qmEnabled).build();

validateQuerySuccess(queryObservable3.byPage(), validator3, TIMEOUT);
validateQuerySuccess(queryObservable3.byPage(pageSize), validator3, TIMEOUT);

// validate default value will be used for byPage
FeedResponseListValidator<InternalObjectNode> validator4 =
new FeedResponseListValidator
.Builder<InternalObjectNode>()
.totalSize(expectedTotalSize)
.numberOfPages(1)
.pageLengths(new int[] { expectedTotalSize })
.hasValidQueryMetrics(qmEnabled)
.build();

validateQuerySuccess(queryObservable3.byPage(), validator4, TIMEOUT);

if (i == 0) {
options.setPartitionKey(new PartitionKey(firstPk));
Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#### Bugs Fixed
* Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599).
* Fixed an issue where `pageSize` from `byPage` is not always being honored. This only happens when the same `CosmosQueryRequestOptions` being used through different
requests, and different pageSize being used. See [PR 36847](https://github.com/Azure/azure-sdk-for-java/pull/36847)

#### Other Changes
* Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,6 @@ <T> CosmosPagedFlux<T> readAllItems(CosmosQueryRequestOptions options, Class<T>
queryOptionsAccessor.withEmptyPageDiagnosticsEnabled(nonNullOptions, true)
: nonNullOptions;

queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);

pagedFluxOptions.setTracerAndTelemetryInformation(
this.readAllItemsSpanName,
database.getId(),
Expand Down Expand Up @@ -956,8 +954,6 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
: nonNullOptions;
String spanName = this.queryItemsSpanName;

queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);

pagedFluxOptions.setTracerAndTelemetryInformation(
spanName,
database.getId(),
Expand Down Expand Up @@ -994,7 +990,6 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFu
queryOptionsAccessor.withEmptyPageDiagnosticsEnabled(nonNullOptions, true)
: nonNullOptions;
String spanName = this.queryItemsSpanName;
queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);
pagedFluxOptions.setTracerAndTelemetryInformation(
spanName,
database.getId(),
Expand Down Expand Up @@ -1084,7 +1079,6 @@ <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryChangeFeedInter

CosmosAsyncClient client = this.getDatabase().getClient();
String spanName = this.queryChangeFeedSpanName;
cfOptionsAccessor.applyMaxItemCount(cosmosChangeFeedRequestOptions, pagedFluxOptions);
pagedFluxOptions.setTracerAndTelemetryInformation(
spanName,
database.getId(),
Expand Down Expand Up @@ -1555,7 +1549,6 @@ public <T> CosmosPagedFlux<T> readAllItems(
requestOptions.setPartitionKey(partitionKey);

return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
queryOptionsAccessor.applyMaxItemCount(options, pagedFluxOptions);
pagedFluxOptions.setTracerAndTelemetryInformation(
this.readAllItemsOfLogicalPartitionSpanName,
database.getId(),
Expand Down Expand Up @@ -2709,6 +2702,25 @@ public <T> Mono<FeedResponse<T>> readMany(

return cosmosAsyncContainer.readMany(itemIdentityList, requestOptions, classType);
}

@Override
public <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFunc(
CosmosAsyncContainer cosmosAsyncContainer,
SqlQuerySpec sqlQuerySpec,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
Class<T> classType) {

return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpec, cosmosQueryRequestOptions, classType);
}

@Override
public <T> Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> queryItemsInternalFuncWithMonoSqlQuerySpec(
CosmosAsyncContainer cosmosAsyncContainer,
Mono<SqlQuerySpec> sqlQuerySpecMono,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
Class<T> classType) {
return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpecMono, cosmosQueryRequestOptions, classType);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@

import com.azure.cosmos.implementation.AsyncDocumentClient;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Warning;
import com.azure.cosmos.implementation.query.Transformer;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
import reactor.core.publisher.Mono;

import static com.azure.cosmos.implementation.Warning.INTERNAL_USE_ONLY_WARNING;

Expand Down Expand Up @@ -133,28 +129,6 @@ public static CosmosException cosmosException(int statusCode, Exception innerExc
return new CosmosException(statusCode, innerException);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <T> CosmosPagedFlux<T> queryItemsInternal(CosmosAsyncContainer container,
SqlQuerySpec sqlQuerySpec,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
Transformer<T> transformer) {
return UtilBridgeInternal.createCosmosPagedFlux(transformer.transform(container.queryItemsInternalFunc(
sqlQuerySpec,
cosmosQueryRequestOptions,
JsonNode.class)));
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <T> CosmosPagedFlux<T> queryItemsInternal(CosmosAsyncContainer container,
Mono<SqlQuerySpec> sqlQuerySpecMono,
CosmosQueryRequestOptions cosmosQueryRequestOptions,
Transformer<T> transformer) {
return UtilBridgeInternal.createCosmosPagedFlux(transformer.transform(container.queryItemsInternalFunc(
sqlQuerySpecMono,
cosmosQueryRequestOptions,
JsonNode.class)));
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static <T> CosmosPagedFlux<T> queryChangeFeedInternal(
CosmosAsyncContainer container,
Expand Down
Loading

0 comments on commit b21e8a8

Please sign in to comment.