Skip to content

Commit

Permalink
Allow for setMaxRetryJobs in BigQueryIO to be configurable (#25224)
Browse files Browse the repository at this point in the history
* Allow for setMaxRetryJobs in BigQueryIO to be configurable

* Allow for setMaxRetryJobs in BigQueryIO to be configurable
  • Loading branch information
dannikay authored Feb 6, 2023
1 parent aee2c84 commit bb582d8
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1914,6 +1914,7 @@ public static <T> Write<T> write() {
.setPropagateSuccessful(true)
.setAutoSchemaUpdate(false)
.setDeterministicRecordIdFn(null)
.setMaxRetryJobs(1000)
.build();
}

Expand Down Expand Up @@ -2044,6 +2045,8 @@ public enum Method {

abstract Boolean getIgnoreInsertIds();

abstract int getMaxRetryJobs();

abstract @Nullable String getKmsKey();

abstract Boolean getOptimizeWrites();
Expand Down Expand Up @@ -2147,6 +2150,8 @@ abstract Builder<T> setAvroSchemaFactory(
@Experimental
abstract Builder<T> setAutoSharding(Boolean autoSharding);

abstract Builder<T> setMaxRetryJobs(int maxRetryJobs);

abstract Builder<T> setPropagateSuccessful(Boolean propagateSuccessful);

abstract Builder<T> setAutoSchemaUpdate(Boolean autoSchemaUpdate);
Expand Down Expand Up @@ -2656,6 +2661,11 @@ public Write<T> withAutoSharding() {
return toBuilder().setAutoSharding(true).build();
}

/** If set, this will set the max number of retry of batch load jobs. */
public Write<T> withMaxRetryJobs(int maxRetryJobs) {
return toBuilder().setMaxRetryJobs(maxRetryJobs).build();
}

/**
* If true, it enables the propagation of the successfully inserted TableRows on BigQuery as
* part of the {@link WriteResult} object when using {@link Method#STREAMING_INSERTS}. By
Expand Down Expand Up @@ -3153,7 +3163,7 @@ private <DestinationT> WriteResult continueExpandTyped(
// When running in streaming (unbounded mode) we want to retry failed load jobs
// indefinitely. Failing the bundle is expensive, so we set a fairly high limit on retries.
if (IsBounded.UNBOUNDED.equals(input.isBounded())) {
batchLoads.setMaxRetryJobs(1000);
batchLoads.setMaxRetryJobs(getMaxRetryJobs());
}
batchLoads.setTriggeringFrequency(getTriggeringFrequency());
if (getAutoSharding()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2184,6 +2184,20 @@ public void testWriteValidateFailsWithBatchAutoSharding() {
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}

@Test
public void testMaxRetryJobs() {
BigQueryIO.Write<TableRow> write =
BigQueryIO.writeTableRows()
.to("dataset.table")
.withSchema(new TableSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(
EnumSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
.withMaxRetryJobs(500);
assertEquals(500, write.getMaxRetryJobs());
}

@Test
public void testWritePartitionEmptyData() throws Exception {
long numFiles = 0;
Expand Down

0 comments on commit bb582d8

Please sign in to comment.