diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index 98825fadd8..f61c2c1417 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -971,15 +971,22 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r try { requestWrapper.retryCount++; if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) { - // Trigger exponential backoff in append loop when request is resent for quota errors - if (requestWrapper.attemptSettings == null) { - requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt(); - } else { - requestWrapper.attemptSettings = - requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings); - } + // Trigger exponential backoff in append loop when request is resent for quota errors. + // createNextAttempt correctly initializes the retry delay; createfirstAttempt does not + // include a positive delay, just 0. + requestWrapper.attemptSettings = + requestWrapper.retryAlgorithm.createNextAttempt( + requestWrapper.attemptSettings == null + ? requestWrapper.retryAlgorithm.createFirstAttempt() + : requestWrapper.attemptSettings); requestWrapper.blockMessageSendDeadline = Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis()); + log.info( + "Messages blocked for retry for " + + java.time.Duration.between( + java.time.Instant.now(), requestWrapper.blockMessageSendDeadline) + + " until " + + requestWrapper.blockMessageSendDeadline); } Long offset = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java index a31cc145a6..120e004b7d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java @@ -19,6 +19,8 @@ import com.google.protobuf.AbstractMessage; import io.grpc.ServerServiceDefinition; import io.grpc.Status; +import java.time.Instant; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -128,4 +130,8 @@ public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) { public void setVerifyOffset(boolean verifyOffset) { serviceImpl.setVerifyOffset(verifyOffset); } + + public ArrayList getLatestRequestReceivedInstants() { + return serviceImpl.getLatestRequestReceivedInstants(); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java index 16f3feea3c..abf08bd0e1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java @@ -20,6 +20,7 @@ import com.google.rpc.Code; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -73,6 +74,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { private final Map, Boolean> connectionToFirstRequest = new ConcurrentHashMap<>(); private Status failedStatus = Status.ABORTED; + private ArrayList requestReceivedInstants = new ArrayList<>(); /** Class used to save the state of a possible response. */ public static class Response { @@ -111,6 +113,10 @@ public String toString() { } } + public ArrayList getLatestRequestReceivedInstants() { + return requestReceivedInstants; + } + @Override public void getWriteStream( GetWriteStreamRequest request, StreamObserver responseObserver) { @@ -197,6 +203,7 @@ public StreamObserver appendRows( new StreamObserver() { @Override public void onNext(AppendRowsRequest value) { + requestReceivedInstants.add(Instant.now()); recordCount++; requests.add(value); long offset = value.getOffset().getValue(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index ee18e9e68d..6f9c49d427 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -54,6 +54,7 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -86,12 +87,15 @@ public class StreamWriterTest { private static final String EXPLICIT_STREAM = "projects/p/datasets/d1/tables/t1/streams/s1"; private static final String TEST_TRACE_ID = "DATAFLOW:job_id"; private static final int MAX_RETRY_NUM_ATTEMPTS = 3; + private static final long INITIAL_RETRY_MILLIS = 500; + private static final double RETRY_MULTIPLIER = 1.3; + private static final int MAX_RETRY_DELAY_MINUTES = 5; private static final RetrySettings retrySettings = RetrySettings.newBuilder() - .setInitialRetryDelay(Duration.ofMillis(500)) - .setRetryDelayMultiplier(1.1) + .setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS)) + .setRetryDelayMultiplier(RETRY_MULTIPLIER) .setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS) - .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5)) + .setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES)) .build(); private FakeScheduledExecutorService fakeExecutor; private FakeBigQueryWrite testBigQueryWrite; @@ -2002,6 +2006,46 @@ public void testExclusiveAppendSuccessAndQuotaErrorRetryMaxRetry() throws Except ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); } + @Test + public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Exception { + testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true); + StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled(); + + testBigQueryWrite.addResponse( + new DummyResponseSupplierWillFailThenSucceed( + new FakeBigQueryWriteImpl.Response(createAppendResponse(0)), + /* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1, + com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build())); + + ApiFuture future = + writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0); + + ExecutionException ex = + assertThrows( + ExecutionException.class, + () -> { + future.get(); + }); + assertEquals( + Status.Code.RESOURCE_EXHAUSTED, + ((StatusRuntimeException) ex.getCause()).getStatus().getCode()); + + ArrayList instants = testBigQueryWrite.getLatestRequestReceivedInstants(); + Instant previousInstant = instants.get(0); + // Include initial attempt + assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1); + double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95; + for (int i = 1; i < instants.size(); i++) { + Instant currentInstant = instants.get(i); + double differenceInMillis = + java.time.Duration.between(previousInstant, currentInstant).toMillis(); + assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS); + assertThat(differenceInMillis).isGreaterThan(minExpectedDelay); + minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER; + previousInstant = currentInstant; + } + } + @Test public void testAppendSuccessAndNonRetryableError() throws Exception { StreamWriter writer = getTestStreamWriterRetryEnabled();