Skip to content

Commit

Permalink
Client Retry Policy: Adds HTTP timeouts with request-level cross-regi…
Browse files Browse the repository at this point in the history
…on retry (Azure#32450)

* mirror of .net fix - initial commit

* nits

Co-authored-by: Fabian Meiswinkel <fabian@meiswinkel.com>

* nits

Co-authored-by: Fabian Meiswinkel <fabian@meiswinkel.com>

* updated changelog, fixes to tests/address refresh beahvior

* removal of double handling of queryplan, improved changelog message

* added check to see if write requests are safe to retry

* removed handeling of data plane writes

* removed extra test

* Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java

Co-authored-by: Kushagra Thapar <kuthapar@microsoft.com>

* added testing for data plane writes, simplified query plan check

* nits

Co-authored-by: Nalu Tripician <ntripician@microsoft.com>
Co-authored-by: Fabian Meiswinkel <fabian@meiswinkel.com>
Co-authored-by: Kushagra Thapar <kuthapar@microsoft.com>
  • Loading branch information
4 people authored Dec 14, 2022
1 parent acac638 commit fa0869d
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 16 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Fixed issue on noisy `CancellationException` log - See [PR 31882](https://github.com/Azure/azure-sdk-for-java/pull/31882)
* Added `retyrAfterInMs` to `StoreResult` in `CosmosDiagnostics` - See [31219](https://github.com/Azure/azure-sdk-for-java/pull/31219)
* Optimized the `readMany` API to make use of point reads when a single item is requested for a given physical partition - See [PR 31723](https://github.com/Azure/azure-sdk-for-java/pull/31723)
* Added cross region retries for data plane, query plan and metadata requests failed with http timeouts - See [32450](https://github.com/Azure/azure-sdk-for-java/pull/32450)

### 4.39.0 (2022-11-16)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,16 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
} else if (clientException != null &&
WebExceptionUtility.isReadTimeoutException(clientException) &&
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT)) {
// if operationType is QueryPlan / AddressRefresh then just retry
if (this.request.getOperationType() == OperationType.QueryPlan || this.request.isAddressRefresh()) {

boolean canFailoverOnTimeout = canGatewayRequestFailoverOnTimeout(request, clientException);

//if operation is data plane read, metadata read, or query plan it can be retried on a different endpoint.
if(canFailoverOnTimeout) {
return shouldRetryOnEndpointFailureAsync(this.isReadRequest, true, true);
}

// if operationType AddressRefresh then just retry
if (this.request.isAddressRefresh()) {
return shouldRetryQueryPlanAndAddress();
}
} else {
Expand All @@ -152,6 +160,32 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
return this.throttlingRetry.shouldRetry(e);
}

private boolean canGatewayRequestFailoverOnTimeout(RxDocumentServiceRequest request, CosmosException clientException) {
//Query Plan requests
if(request.getOperationType() == OperationType.QueryPlan) {
return true;
}

//Meta data request check
boolean isMetaDataRequest = (request.getOperationType() != OperationType.ExecuteJavaScript
&& request.getResourceType() == ResourceType.StoredProcedure)
|| request.getResourceType() != ResourceType.Document;

//Meta Data Read
if(isMetaDataRequest && request.isReadOnly()) {
return true;
}

//Data Plane Read
if(!isMetaDataRequest
&& !request.isAddressRefresh()
&& request.isReadOnly()) {
return true;
}

return false;
}

private Mono<ShouldRetryResult> shouldRetryQueryPlanAndAddress() {

if (this.queryPlanAddressRefreshCount++ > MAX_QUERY_PLAN_AND_ADDRESS_RETRY_COUNT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,88 @@ public void networkFailureOnRead() throws Exception {
}
}

@Test(groups = "unit")
public void shouldRetryOnGatewayTimeout() throws Exception {
ThrottlingRetryOptions throttlingRetryOptions = new ThrottlingRetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null);

Exception exception = ReadTimeoutException.INSTANCE;
CosmosException cosmosException = BridgeInternal.createCosmosException(null, HttpConstants.StatusCodes.REQUEST_TIMEOUT, exception);
BridgeInternal.setSubStatusCode(cosmosException, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT);

RxDocumentServiceRequest dsr;
Mono<ShouldRetryResult> shouldRetry;

//Data Plane Read
dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.Read, "/dbs/db/colls/col/docs/doc", ResourceType.Document);

clientRetryPolicy.onBeforeSendRequest(dsr);
shouldRetry = clientRetryPolicy.shouldRetry(cosmosException);

validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(Duration.ofMillis(1000))
.build());

//Metadata Read
dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.Read, "/dbs/db/clls/col/docs/doc", ResourceType.Database);

clientRetryPolicy.onBeforeSendRequest(dsr);

shouldRetry = clientRetryPolicy.shouldRetry(cosmosException);

validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(Duration.ofMillis(1000))
.build());

//Query Plan
dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.QueryPlan, "/dbs/db/colls/col/docs/doc", ResourceType.Document);

clientRetryPolicy.onBeforeSendRequest(dsr);

shouldRetry = clientRetryPolicy.shouldRetry(cosmosException);

validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(Duration.ofMillis(1000))
.build());

//Data Plane Write - Should not retry
dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.Create, "/dbs/db/colls/col/docs/doc", ResourceType.Document);

clientRetryPolicy.onBeforeSendRequest(dsr);
shouldRetry = clientRetryPolicy.shouldRetry(cosmosException);

validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());

//Metadata Write - Should not Retry
dsr = RxDocumentServiceRequest.createFromName(mockDiagnosticsClientContext(),
OperationType.Create, "/dbs/db/colls/col/docs/docId", ResourceType.Database);

clientRetryPolicy.onBeforeSendRequest(dsr);

shouldRetry = clientRetryPolicy.shouldRetry(cosmosException);

validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());
}

@Test(groups = "unit")
public void tcpNetworkFailureOnRead() throws Exception {
ThrottlingRetryOptions retryOptions = new ThrottlingRetryOptions();
Expand Down Expand Up @@ -335,7 +417,7 @@ public void httpNetworkFailureOnQueryPlan() throws Exception {
ThrottlingRetryOptions retryOptions = new ThrottlingRetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null);

Expand All @@ -353,20 +435,13 @@ public void httpNetworkFailureOnQueryPlan() throws Exception {
for (int i = 0; i < 10; i++) {
Mono<ShouldRetryResult> shouldRetry = clientRetryPolicy.shouldRetry(cosmosException);

if (i < 3) {
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(Duration.ofMillis(0))
.build());
} else {
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(false)
.build());
}
validateSuccess(shouldRetry, ShouldRetryValidator.builder()
.nullException()
.shouldRetry(true)
.backOfTime(Duration.ofMillis(1000))
.build());

Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(i+1)).markEndpointUnavailableForRead(Mockito.any());
Mockito.verify(endpointManager, Mockito.times(0)).markEndpointUnavailableForWrite(Mockito.any());
}
}
Expand Down

0 comments on commit fa0869d

Please sign in to comment.