From fc8e083a383c5e25bd03eedc83315067647e0580 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 5 Dec 2023 17:27:49 +0100 Subject: [PATCH] Fixed a bug resulting in high number of PartitionKeyRange ReadFeed requests when using bulk execution not using Spark connector. (#37920) * Fixing high number of PKRangeFeed calls when using BulkExecution without SparkConnector * Adding unit test coverage * Update CHANGELOG.md --- .../java/com/azure/cosmos/FeedRangeTest.java | 127 ++++++++++++++++++ .../cosmos/implementation/SessionTest.java | 2 +- sdk/cosmos/azure-cosmos/CHANGELOG.md | 1 + .../azure/cosmos/CosmosAsyncContainer.java | 11 +- .../implementation/AsyncDocumentClient.java | 3 +- .../ImplementationBridgeHelpers.java | 3 + .../implementation/RxDocumentClientImpl.java | 17 ++- .../implementation/RxGatewayStoreModel.java | 11 ++ .../implementation/batch/BulkExecutor.java | 5 +- 9 files changed, 171 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FeedRangeTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FeedRangeTest.java index 409bd30ff8435..672cfa3813a77 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FeedRangeTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FeedRangeTest.java @@ -4,6 +4,18 @@ */ package com.azure.cosmos; +import com.azure.cosmos.implementation.Configs; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; +import com.azure.cosmos.implementation.RxGatewayStoreModel; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider; +import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore; +import com.azure.cosmos.models.CosmosContainerIdentity; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.rx.TestSuiteBase; @@ -11,9 +23,12 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Factory; import org.testng.annotations.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.List; import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; import static org.assertj.core.api.Assertions.assertThat; public class FeedRangeTest extends TestSuiteBase { @@ -58,4 +73,116 @@ public void feedRange_RecreateContainerWithSameName() { } } } + + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void feedRange_withForceRefresh() { + String containerName = UUID.randomUUID().toString(); + String databaseName = preExistingDatabaseId; + CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(containerName, "/PE_Name"); + houseKeepingClient.getDatabase(databaseName).createContainerIfNotExists(cosmosContainerProperties); + try(CosmosAsyncClient clientUnderTest = cosmosClientBuilderUnderTest.buildAsyncClient()) { + RxDocumentClientImpl rxClient = (RxDocumentClientImpl)clientUnderTest.getContextClient(); + RxGatewayStoreModel rxGatewayStoreModel = (RxGatewayStoreModel)ReflectionUtils.getGatewayProxy(rxClient); + DelegatingRxStoreModel pkRangeFeedTrackingGatewayStoreModel = new DelegatingRxStoreModel(rxGatewayStoreModel); + ReflectionUtils.setGatewayProxy(rxClient, pkRangeFeedTrackingGatewayStoreModel); + for (int i = 0; i < 10; i++) { + List rsp = + clientUnderTest.getDatabase(databaseName).getContainer(containerName).getFeedRanges().block(); + assertThat(rsp).isNotNull(); + assertThat(rsp).hasSize(1); + } + assertThat(pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount()).isGreaterThanOrEqualTo(10); + } + + houseKeepingClient.getDatabase(databaseName).getContainer(containerName).delete(); + } + + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void feedRange_noForceRefresh() { + String containerName = UUID.randomUUID().toString(); + String databaseName = preExistingDatabaseId; + CosmosContainerProperties cosmosContainerProperties = new CosmosContainerProperties(containerName, "/PE_Name"); + houseKeepingClient.getDatabase(databaseName).createContainerIfNotExists(cosmosContainerProperties); + try(CosmosAsyncClient clientUnderTest = cosmosClientBuilderUnderTest.buildAsyncClient()) { + RxDocumentClientImpl rxClient = (RxDocumentClientImpl)clientUnderTest.getContextClient(); + RxGatewayStoreModel rxGatewayStoreModel = (RxGatewayStoreModel)ReflectionUtils.getGatewayProxy(rxClient); + DelegatingRxStoreModel pkRangeFeedTrackingGatewayStoreModel = new DelegatingRxStoreModel(rxGatewayStoreModel); + ReflectionUtils.setGatewayProxy(rxClient, pkRangeFeedTrackingGatewayStoreModel); + clientUnderTest.getDatabase(databaseName).getContainer(containerName).getFeedRanges().block(); + + long baselinePkRangeFeedCount = pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount(); + logger.info("Baseline PKRangeFeedCount: {}", baselinePkRangeFeedCount); + + for (int i = 0; i < 10; i++) { + List rsp = + ImplementationBridgeHelpers + .CosmosAsyncContainerHelper + .getCosmosAsyncContainerAccessor() + .getFeedRanges( + clientUnderTest.getDatabase(databaseName).getContainer(containerName), + false).block(); + assertThat(rsp).isNotNull(); + assertThat(rsp).hasSize(1); + } + assertThat(pkRangeFeedTrackingGatewayStoreModel.getPartitionKeyRangeFeedCount()) + .isEqualTo(baselinePkRangeFeedCount); + } + + houseKeepingClient.getDatabase(databaseName).getContainer(containerName).delete(); + } + + static class DelegatingRxStoreModel extends RxGatewayStoreModel { + + private final AtomicLong partitionKeyRangeFeedCounter= new AtomicLong(0); + + private final RxGatewayStoreModel inner; + + public DelegatingRxStoreModel(RxGatewayStoreModel inner) { + super(inner); + + this.inner = inner; + } + + @Override + public Mono processMessage(RxDocumentServiceRequest request) { + + if (request.getResourceType() == ResourceType.PartitionKeyRange + && request.getOperationType() == OperationType.ReadFeed) { + partitionKeyRangeFeedCounter.incrementAndGet(); + } + + return inner.processMessage(request); + } + + @Override + public void enableThroughputControl(ThroughputControlStore throughputControlStore) { + inner.enableThroughputControl(throughputControlStore); + } + + @Override + public Flux submitOpenConnectionTasksAndInitCaches( + CosmosContainerProactiveInitConfig proactiveContainerInitConfig) { + + return inner.submitOpenConnectionTasksAndInitCaches(proactiveContainerInitConfig); + } + + @Override + public void configureFaultInjectorProvider(IFaultInjectorProvider injectorProvider, Configs configs) { + inner.configureFaultInjectorProvider(injectorProvider, configs); + } + + @Override + public void recordOpenConnectionsAndInitCachesCompleted(List cosmosContainerIdentities) { + inner.recordOpenConnectionsAndInitCachesCompleted(cosmosContainerIdentities); + } + + @Override + public void recordOpenConnectionsAndInitCachesStarted(List cosmosContainerIdentities) { + inner.recordOpenConnectionsAndInitCachesStarted(cosmosContainerIdentities); + } + + public long getPartitionKeyRangeFeedCount() { + return this.partitionKeyRangeFeedCounter.get(); + } + } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java index aa720b7387f8e..8663514dde432 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/SessionTest.java @@ -217,7 +217,7 @@ public void partitionedSessionToken(boolean isNameBased) throws NoSuchMethodExce // Session token validation for feed ranges query spyClient.clearCapturedRequests(); - List feedRanges = spyClient.getFeedRanges(getCollectionLink(isNameBased)).block(); + List feedRanges = spyClient.getFeedRanges(getCollectionLink(isNameBased), true).block(); queryRequestOptions = new CosmosQueryRequestOptions(); queryRequestOptions.setFeedRange(feedRanges.get(0)); dummyState = TestUtils.createDummyQueryFeedOperationState( diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 8ff59a177c8f5..bf64b90c58d4e 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed high number of PKRangeFeed calls when using BulkExecution without SparkConnector - See [PR 37920](https://github.com/Azure/azure-sdk-for-java/pull/37920) #### Other Changes * Changed to `DEBUG` log level in `WebExceptionRetryPolicy` for non-handled exception scenario and retry scenario - See [PR 37918](https://github.com/Azure/azure-sdk-for-java/pull/37918) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index 12c351e1b3e77..19956728701c2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -2539,7 +2539,11 @@ ItemDeserializer getItemDeserializer() { * @return An unmodifiable list of {@link FeedRange} */ public Mono> getFeedRanges() { - return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink()); + return this.getFeedRanges(true); + } + + Mono> getFeedRanges(boolean forceRefresh) { + return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink(), forceRefresh); } /** @@ -2759,6 +2763,11 @@ public Function>> queryItemsInt Class classType) { return cosmosAsyncContainer.queryItemsInternalFunc(sqlQuerySpecMono, cosmosQueryRequestOptions, classType); } + + @Override + public Mono> getFeedRanges(CosmosAsyncContainer cosmosAsyncContainer, boolean forceRefresh) { + return cosmosAsyncContainer.getFeedRanges(forceRefresh); + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java index bed912608ea6a..a9a197055f573 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java @@ -760,9 +760,10 @@ Flux> queryDocumentChangeFeedFromPagedFlux( * Gets the feed ranges of a container. * * @param collectionLink the link to the parent document collection. + * @param forceRefresh a flag indicating whether to force a refresh * @return a {@link List} of @{link FeedRange} containing the feed ranges of a container. */ - Mono> getFeedRanges(String collectionLink); + Mono> getFeedRanges(String collectionLink, boolean forceRefresh); /** * Creates a stored procedure. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 6b3e7c695a77a..3063e30fe5719 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -57,6 +57,7 @@ import com.azure.cosmos.models.CosmosMetricName; import com.azure.cosmos.models.CosmosPatchOperations; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; @@ -931,6 +932,8 @@ Function>> queryItemsInternalFu Mono sqlQuerySpecMono, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class classType); + + Mono> getFeedRanges(CosmosAsyncContainer cosmosAsyncContainer, boolean forceRefresh); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 9545419febd23..f42c02c755403 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -5207,7 +5207,7 @@ private static SqlQuerySpec createLogicalPartitionScanQuerySpec( } @Override - public Mono> getFeedRanges(String collectionLink) { + public Mono> getFeedRanges(String collectionLink, boolean forceRefresh) { InvalidPartitionExceptionRetryPolicy invalidPartitionExceptionRetryPolicy = new InvalidPartitionExceptionRetryPolicy( this.collectionCache, null, @@ -5224,12 +5224,16 @@ public Mono> getFeedRanges(String collectionLink) { invalidPartitionExceptionRetryPolicy.onBeforeSendRequest(request); return ObservableHelper.inlineIfPossibleAsObs( - () -> getFeedRangesInternal(request, collectionLink), + () -> getFeedRangesInternal(request, collectionLink, forceRefresh), invalidPartitionExceptionRetryPolicy); } - private Mono> getFeedRangesInternal(RxDocumentServiceRequest request, String collectionLink) { - logger.debug("getFeedRange collectionLink=[{}]", collectionLink); + private Mono> getFeedRangesInternal( + RxDocumentServiceRequest request, + String collectionLink, + boolean forceRefresh) { + + logger.debug("getFeedRange collectionLink=[{}] - forceRefresh={}", collectionLink, forceRefresh); if (StringUtils.isEmpty(collectionLink)) { throw new IllegalArgumentException("collectionLink"); @@ -5247,7 +5251,10 @@ private Mono> getFeedRangesInternal(RxDocumentServiceRequest req Mono>> valueHolderMono = partitionKeyRangeCache .tryGetOverlappingRangesAsync( BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), - collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null); + collection.getResourceId(), + RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, + forceRefresh, + null); return valueHolderMono.map(partitionKeyRangeList -> toFeedRanges(partitionKeyRangeList, request)); }); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index b2be10212381a..0052f32a38b00 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -112,6 +112,17 @@ public RxGatewayStoreModel( this.sessionContainer = sessionContainer; } + public RxGatewayStoreModel(RxGatewayStoreModel inner) { + this.clientContext = inner.clientContext; + this.defaultHeaders = inner.defaultHeaders; + this.defaultConsistencyLevel = inner.defaultConsistencyLevel; + this.globalEndpointManager = inner.globalEndpointManager; + this.queryCompatibilityMode = inner.queryCompatibilityMode; + + this.httpClient = inner.httpClient; + this.sessionContainer = inner.sessionContainer; + } + void setGatewayServiceConfigurationReader(GatewayServiceConfigurationReader gatewayServiceConfigurationReader) { this.gatewayServiceConfigurationReader = gatewayServiceConfigurationReader; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java index 1484d92898fc7..90444895ef251 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java @@ -289,7 +289,10 @@ private Flux> executeCore() { .getMaxConcurrentCosmosPartitions(cosmosBulkExecutionOptions); Mono maxConcurrentCosmosPartitionsMono = nullableMaxConcurrentCosmosPartitions != null ? Mono.just(Math.max(256, nullableMaxConcurrentCosmosPartitions)) : - this.container.getFeedRanges().map(ranges -> Math.max(256, ranges.size() * 2)); + ImplementationBridgeHelpers + .CosmosAsyncContainerHelper + .getCosmosAsyncContainerAccessor() + .getFeedRanges(this.container, false).map(ranges -> Math.max(256, ranges.size() * 2)); return maxConcurrentCosmosPartitionsMono