Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to specify RetryOptions and BigQueryRetryConfig when create job and waitFor #3398

Merged
merged 12 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public static JobListOption fields(JobField... fields) {
/** Class for specifying table get and create options. */
class JobOption extends Option {

private static final long serialVersionUID = -3111736712316353665L;
private static final long serialVersionUID = -3111736712316353664L;

private JobOption(BigQueryRpc.Option option, Object value) {
super(option, value);
Expand All @@ -578,6 +578,16 @@ public static JobOption fields(JobField... fields) {
return new JobOption(
BigQueryRpc.Option.FIELDS, Helper.selector(JobField.REQUIRED_FIELDS, fields));
}

/** Returns an option to specify the job's BigQuery retry configuration. */
public static JobOption bigQueryRetryConfig(BigQueryRetryConfig bigQueryRetryConfig) {
return new JobOption(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, bigQueryRetryConfig);
}

/** Returns an option to specify the job's retry options. */
public static JobOption retryOptions(RetryOption... options) {
return new JobOption(BigQueryRpc.Option.RETRY_OPTIONS, options);
}
}

/** Class for specifying query results options. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.Policy;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.RetryOption;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.InsertAllRequest.RowToInsert;
import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode;
Expand Down Expand Up @@ -415,10 +416,15 @@ public com.google.api.services.bigquery.model.Job call() {
}
}
},
getOptions().getRetrySettings(),
getRetryOptions(optionsMap) != null
? RetryOption.mergeToSettings(
getOptions().getRetrySettings(), getRetryOptions(optionsMap))
: getOptions().getRetrySettings(),
BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER,
getOptions().getClock(),
DEFAULT_RETRY_CONFIG));
getBigQueryRetryConfig(optionsMap) != null
? getBigQueryRetryConfig(optionsMap)
: DEFAULT_RETRY_CONFIG));
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down Expand Up @@ -1628,4 +1634,13 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call()
}
return optionMap;
}

static BigQueryRetryConfig getBigQueryRetryConfig(Map<BigQueryRpc.Option, ?> options) {
return (BigQueryRetryConfig)
options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null);
}

static RetryOption[] getRetryOptions(Map<BigQueryRpc.Option, ?> options) {
return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public boolean isDone() {
Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS));
return job == null || JobStatus.State.DONE.equals(job.getStatus().getState());
}

/**
* Blocks until this job completes its execution, either failing or succeeding. This method
* returns current job's latest information. If the job no longer exists, this method returns
Expand Down Expand Up @@ -238,12 +239,23 @@ public boolean isDone() {
* to complete
*/
public Job waitFor(RetryOption... waitOptions) throws InterruptedException {
return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions);
}

public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
throws InterruptedException {
return waitForInternal(bigQueryRetryConfig, waitOptions);
}

private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions)
throws InterruptedException {
checkNotDryRun("waitFor");
Object completedJobResponse;
if (getConfiguration().getType() == Type.QUERY) {
completedJobResponse =
waitForQueryResults(
RetryOption.mergeToSettings(DEFAULT_JOB_WAIT_SETTINGS, waitOptions),
bigQueryRetryConfig,
DEFAULT_QUERY_WAIT_OPTIONS);
} else {
completedJobResponse =
PhongChuong marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -294,7 +306,9 @@ public TableResult getQueryResults(QueryResultsOption... options)

QueryResponse response =
waitForQueryResults(
DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0]));
DEFAULT_JOB_WAIT_SETTINGS,
DEFAULT_RETRY_CONFIG,
waitOptions.toArray(new QueryResultsOption[0]));

// Get the job resource to determine if it has errored.
Job job = this;
Expand Down Expand Up @@ -334,7 +348,9 @@ public TableResult getQueryResults(QueryResultsOption... options)
}

private QueryResponse waitForQueryResults(
RetrySettings retrySettings, final QueryResultsOption... resultsOptions)
RetrySettings retrySettings,
BigQueryRetryConfig bigQueryRetryConfig,
final QueryResultsOption... resultsOptions)
throws InterruptedException {
if (getConfiguration().getType() != Type.QUERY) {
throw new UnsupportedOperationException(
Expand All @@ -360,7 +376,7 @@ public boolean shouldRetry(
}
},
options.getClock(),
DEFAULT_RETRY_CONFIG);
bigQueryRetryConfig);
} catch (BigQueryRetryHelper.BigQueryRetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ enum Option {
STATE_FILTER("stateFilter"),
TIMEOUT("timeoutMs"),
REQUESTED_POLICY_VERSION("requestedPolicyVersion"),
TABLE_METADATA_VIEW("view");
TABLE_METADATA_VIEW("view"),
RETRY_OPTIONS("retryOptions"),
BIGQUERY_RETRY_CONFIG("bigQueryRetryConfig");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.bigquery.model.*;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.cloud.Policy;
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
import com.google.cloud.Tuple;
import com.google.cloud.bigquery.BigQuery.JobOption;
Expand Down Expand Up @@ -1594,6 +1595,119 @@ public void testCreateJobFailureShouldRetry() {
verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS));
}

@Test
public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() {
// Validate create job with BigQueryRetryConfig that retries on rate limit error message.
JobOption bigQueryRetryConfigOption =
JobOption.bigQueryRetryConfig(
BigQueryRetryConfig.newBuilder()
.retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
.retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
.retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
.build());

Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(
new BigQueryException(
400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG
.thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() {
// Validate create job with BigQueryRetryConfig that does not retry on rate limit error message.
JobOption bigQueryRetryConfigOption =
JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder().build());

Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(400, RATE_LIMIT_ERROR_MSG));

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

try {
((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption);
fail("JobException expected");
} catch (BigQueryException e) {
assertNotNull(e.getMessage());
}
// Verify that getQueryResults is attempted only once and not retried since the error message
// does not match.
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithRetryOptionsFailureShouldRetry() {
// Validate create job with RetryOptions.
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(4));
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
.thenThrow(new BigQueryException(503, "Service Unavailable"))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithRetryOptionsFailureShouldNotRetry() {
// Validate create job with RetryOptions that only attempts once (no retry).
JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1));
Map<BigQueryRpc.Option, ?> bigQueryRpcOptions = optionMap(retryOptions);
when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions)))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenReturn(newJobPb());

bigquery = options.getService();
bigquery =
options
.toBuilder()
.setRetrySettings(ServiceOptions.getDefaultRetrySettings())
.build()
.getService();

try {
((BigQueryImpl) bigquery)
.create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions);
fail("JobException expected");
} catch (BigQueryException e) {
assertNotNull(e.getMessage());
}
verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions));
}

@Test
public void testCreateJobWithSelectedFields() {
when(bigqueryRpcMock.create(
Expand Down
Loading
Loading