Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create Job retry for rate limit exceeded with status code 200 #1744

Merged
merged 10 commits into from
Dec 14, 2021
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:24.0.0')
implementation platform('com.google.cloud:libraries-bom:24.1.0')

implementation 'com.google.cloud:google-cloud-bigquery'
```
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-bigquery:2.5.0'
implementation 'com.google.cloud:google-cloud-bigquery:2.5.1'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.5.0"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquery" % "2.5.1"
```

## Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,37 +347,46 @@ public JobId get() {

@InternalApi("visible for testing")
Job create(JobInfo jobInfo, Supplier<JobId> idProvider, JobOption... options) {
boolean idRandom = false;
if (jobInfo.getJobId() == null) {
jobInfo = jobInfo.toBuilder().setJobId(idProvider.get()).build();
idRandom = true;
}
final com.google.api.services.bigquery.model.Job jobPb =
jobInfo.setProjectId(getOptions().getProjectId()).toPb();
final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
final boolean idRandom = (jobInfo.getJobId() == null);

final Map<BigQueryRpc.Option, ?> optionsMap = optionMap(options);
BigQueryException createException;
// NOTE(pongad): This double-try structure is admittedly odd.
// translateAndThrow itself throws, and pretends to return an exception only
// so users can pretend to throw.
// This makes it difficult to translate without throwing.
// Fixing this entails some work on BaseServiceException.translate.
// Since that affects a bunch of APIs, we should fix this as a separate change.
final JobId[] finalJobId = new JobId[1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have to be an array? Can we just use jobId? since we are only storing a single, final jobId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried several approaches and using an array is the only viable option i found. My understanding is that the variable needs to be a final in order to be accessed from within the inner class (BigQueryRetryHelper). If it's a final it can not be modified, therefore the new jobId values cannot be assigned to it

I would really appreciate any suggestions on how to avoid using an array here.

try {
try {
return Job.fromPb(
this,
runWithRetries(
BigQueryRetryHelper.runWithRetries(
new Callable<com.google.api.services.bigquery.model.Job>() {
@Override
public com.google.api.services.bigquery.model.Job call() {
return bigQueryRpc.create(jobPb, optionsMap);
if (idRandom) {
// re-generate a new random job with the same jobInfo when jobId is not
// provided by the user
JobInfo recreatedJobInfo =
jobInfo.toBuilder().setJobId(idProvider.get()).build();
com.google.api.services.bigquery.model.Job newJobPb =
recreatedJobInfo.setProjectId(getOptions().getProjectId()).toPb();
finalJobId[0] = recreatedJobInfo.getJobId();
return bigQueryRpc.create(newJobPb, optionsMap);
} else {
com.google.api.services.bigquery.model.Job jobPb =
jobInfo.setProjectId(getOptions().getProjectId()).toPb();
return bigQueryRpc.create(jobPb, optionsMap);
}
}
},
getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock()));
} catch (RetryHelper.RetryHelperException e) {
EXCEPTION_HANDLER,
getOptions().getClock(),
DEFAULT_RETRY_CONFIG));
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
} catch (BigQueryException e) {
Expand All @@ -396,7 +405,7 @@ public com.google.api.services.bigquery.model.Job call() {
// fetch a job created by someone else.
Job job;
try {
job = getJob(jobInfo.getJobId());
job = getJob(finalJobId[0]);
} catch (BigQueryException e) {
throw createException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public boolean shouldRetry(
// the exception messages
boolean shouldRetry =
(shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(previousThrowable, bigQueryRetryConfig))
|| shouldRetryBasedOnBigQueryRetryConfig(
previousThrowable, bigQueryRetryConfig, previousResponse))
&& shouldRetryBasedOnTiming(context, nextAttemptSettings);

if (LOG.isLoggable(Level.FINEST)) {
Expand All @@ -92,13 +93,26 @@ public boolean shouldRetry(
}

private boolean shouldRetryBasedOnBigQueryRetryConfig(
Throwable previousThrowable, BigQueryRetryConfig bigQueryRetryConfig) {
Throwable previousThrowable,
BigQueryRetryConfig bigQueryRetryConfig,
ResponseT previousResponse) {
/*
We are deciding if a given error should be retried on the basis of error message.
Cannot rely on Error/Status code as for example error code 400 (which is not retriable) could be thrown due to rateLimitExceed, which is retriable
*/
String errorDesc;
if (previousThrowable != null && (errorDesc = previousThrowable.getMessage()) != null) {
String errorDesc = null;
if (previousThrowable != null) {
errorDesc = previousThrowable.getMessage();
} else if (previousResponse != null) {
/*
In some cases error messages may come without an exception
e.g. status code 200 with a rate limit exceeded for job create
in these cases there is now previousThrowable so we need to check previousResponse
*/
errorDesc = previousResponse.toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a mock test in BigQueryImplTest to verify the correctness of the behavior; i.e. retrying with status code 200 on rateLimitExceeded error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added mock test to BigQueryImplTest test is called testCreateJobFailureShouldRetry

}

if (errorDesc != null) {
errorDesc = errorDesc.toLowerCase(); // for case insensitive comparison
for (Iterator<String> retriableMessages =
bigQueryRetryConfig.getRetriableErrorMessages().iterator();
Expand Down Expand Up @@ -161,7 +175,8 @@ public TimedAttemptSettings createNextAttempt(
if (!((shouldRetryBasedOnResult(context, previousThrowable, previousResponse)
|| shouldRetryBasedOnBigQueryRetryConfig(
previousThrowable,
bigQueryRetryConfig)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if
bigQueryRetryConfig,
previousResponse)))) { // Calling shouldRetryBasedOnBigQueryRetryConfig to check if
// the error message could be retried
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,30 @@ public void testCreateJobSuccess() {
verify(bigqueryRpcMock).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobFailureShouldRetry() {
when(bigqueryRpcMock.create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenThrow(
new BigQueryException(
400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG
.thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY));
verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobWithSelectedFields() {
when(bigqueryRpcMock.create(
Expand Down