diff --git a/CHANGES.md b/CHANGES.md
index efa2d07e0..0cd570c0b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -3,6 +3,7 @@
## Next
* Issue #993: Spark ML vector read and write fails
+* PR #1007: Implement at-least-once option that utilizes default stream
## 0.31.1 - 2023-06-06
diff --git a/README-template.md b/README-template.md
index f97eb6b7d..a511c5882 100644
--- a/README-template.md
+++ b/README-template.md
@@ -325,11 +325,17 @@ df.writeStream \
The API Supports a number of options to configure the read
-
+
+
- Property |
+ Property |
Meaning |
- Usage |
+ Usage |
table
@@ -532,6 +538,17 @@ The API Supports a number of options to configure the read
|
Write |
+
+ writeAtLeastOnce
+ |
+ Guarantees that data is written to BigQuery at least once. This is a lesser
+ guarantee than exactly once. This is suitable for streaming scenarios
+ in which data is continuously being written in small batches.
+ (Optional. Defaults to false )
+ Supported only by the `DIRECT` write method.
+ |
+ Write |
+
temporaryGcsBucket
|
@@ -1139,6 +1156,25 @@ You can manually set the number of partitions with the `maxParallelism` property
You can also always repartition after reading in Spark.
+### I get quota exceeded errors while writing
+
+If there are too many partitions the CreateWriteStream or Throughput [quotas](https://cloud.google.com/bigquery/quotas#write-api-limits)
+may be exceeded. This occurs because while the data within each partition is processed serially, independent
+partitions may be processed in parallel on different nodes within the spark cluster. Generally, to ensure maximum
+sustained throughput you should file a quota increase request. However, you can also manually reduce the number of
+partitions being written by calling coalesce
on the DataFrame to mitigate this problem.
+
+```
+desiredPartitionCount = 5
+dfNew = df.coalesce(desiredPartitionCount)
+dfNew.write
+```
+
+A rule of thumb is to have a single partition handle at least 1GB of data.
+
+Also note that a job running with the `writeAtLeastOnce` property turned on will not encounter CreateWriteStream
+quota errors.
+
### How do I authenticate outside GCE / Dataproc?
The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multiple
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java
index 5361a1aa8..966bade06 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java
@@ -208,6 +208,23 @@ public boolean deleteTable(TableId tableId) {
return bigQuery.delete(tableId);
}
+ private Job copyData(
+ TableId sourceTableId,
+ TableId destinationTableId,
+ JobInfo.WriteDisposition writeDisposition) {
+ String queryFormat = "SELECT * FROM `%s`";
+ QueryJobConfiguration queryConfig =
+ jobConfigurationFactory
+ .createQueryJobConfigurationBuilder(
+ sqlFromFormat(queryFormat, sourceTableId), Collections.emptyMap())
+ .setUseLegacySql(false)
+ .setDestinationTable(destinationTableId)
+ .setWriteDisposition(writeDisposition)
+ .build();
+
+ return create(JobInfo.newBuilder(queryConfig).build());
+ }
+
/**
* Overwrites the given destination table, with all the data from the given temporary table,
* transactionally.
@@ -219,28 +236,25 @@ public boolean deleteTable(TableId tableId) {
*/
public Job overwriteDestinationWithTemporary(
TableId temporaryTableId, TableId destinationTableId) {
- String queryFormat =
- "MERGE `%s`\n"
- + "USING (SELECT * FROM `%s`)\n"
- + "ON FALSE\n"
- + "WHEN NOT MATCHED THEN INSERT ROW\n"
- + "WHEN NOT MATCHED BY SOURCE THEN DELETE";
-
- QueryJobConfiguration queryConfig =
- jobConfigurationFactory
- .createQueryJobConfigurationBuilder(
- sqlFromFormat(queryFormat, destinationTableId, temporaryTableId),
- Collections.emptyMap())
- .setUseLegacySql(false)
- .build();
+ return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_TRUNCATE);
+ }
- return create(JobInfo.newBuilder(queryConfig).build());
+ /**
+ * Appends all the data from the given temporary table, to the given destination table,
+ * transactionally.
+ *
+ * @param temporaryTableId The {@code TableId} representing the temporary-table.
+ * @param destinationTableId The {@code TableId} representing the destination table.
+ * @return The {@code Job} object representing this operation (which can be tracked to wait until
+ * it has finished successfully).
+ */
+ public Job appendDestinationWithTemporary(TableId temporaryTableId, TableId destinationTableId) {
+ return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_APPEND);
}
- String sqlFromFormat(String queryFormat, TableId destinationTableId, TableId temporaryTableId) {
- String destinationTableName = fullTableName(destinationTableId);
+ String sqlFromFormat(String queryFormat, TableId temporaryTableId) {
String temporaryTableName = fullTableName(temporaryTableId);
- return String.format(queryFormat, destinationTableName, temporaryTableName);
+ return String.format(queryFormat, temporaryTableName);
}
/**
diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java
index 6d08f6a64..4905661c5 100644
--- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java
+++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java
@@ -52,6 +52,8 @@ public class BigQueryDirectDataWriterHelper {
private final ProtoSchema protoSchema;
private final RetrySettings retrySettings;
private final Optional traceId;
+ private final int partitionId;
+ private final boolean writeAtLeastOnce;
private String writeStreamName;
private StreamWriter streamWriter;
@@ -66,18 +68,26 @@ public BigQueryDirectDataWriterHelper(
String tablePath,
ProtoSchema protoSchema,
RetrySettings bigqueryDataWriterHelperRetrySettings,
- Optional traceId) {
+ Optional traceId,
+ int partitionId,
+ boolean writeAtLeastOnce) {
this.writeClient = writeClientFactory.getBigQueryWriteClient();
this.tablePath = tablePath;
this.protoSchema = protoSchema;
this.retrySettings = bigqueryDataWriterHelperRetrySettings;
this.traceId = traceId;
-
- try {
- this.writeStreamName = retryCreateWriteStream();
- } catch (ExecutionException | InterruptedException e) {
- throw new BigQueryConnectorException(
- "Could not create write-stream after multiple retries", e);
+ this.partitionId = partitionId;
+ this.writeAtLeastOnce = writeAtLeastOnce;
+
+ if (writeAtLeastOnce) {
+ this.writeStreamName = this.tablePath + "/_default";
+ } else {
+ try {
+ this.writeStreamName = retryCreateWriteStream();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new BigQueryConnectorException(
+ "Could not create write-stream after multiple retries", e);
+ }
}
this.streamWriter = createStreamWriter(this.writeStreamName);
this.protoRows = ProtoRows.newBuilder();
@@ -181,7 +191,7 @@ public void addRow(ByteString message) throws IOException {
* (deduplication error) or if the response contains an error.
*/
private void sendAppendRowsRequest() throws IOException {
- long offset = writeStreamRowCount;
+ long offset = this.writeAtLeastOnce ? -1 : writeStreamRowCount;
ApiFuture appendRowsResponseApiFuture =
streamWriter.append(protoRows.build(), offset);
@@ -216,14 +226,16 @@ private void validateAppendRowsResponse(
"Append request failed with error: " + appendRowsResponse.getError().getMessage());
}
- AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
- long responseOffset = appendResult.getOffset().getValue();
+ if (!this.writeAtLeastOnce) {
+ AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
+ long responseOffset = appendResult.getOffset().getValue();
- if (expectedOffset != responseOffset) {
- throw new IOException(
- String.format(
- "On stream %s append-rows response, offset %d did not match expected offset %d",
- writeStreamName, responseOffset, expectedOffset));
+ if (expectedOffset != responseOffset) {
+ throw new IOException(
+ String.format(
+ "On stream %s append-rows response, offset %d did not match expected offset %d",
+ writeStreamName, responseOffset, expectedOffset));
+ }
}
}
@@ -241,25 +253,31 @@ public long finalizeStream() throws IOException {
if (this.protoRows.getSerializedRowsCount() != 0) {
sendAppendRowsRequest();
}
+ long responseFinalizedRowCount = writeStreamRowCount;
- waitBeforeFinalization();
+ if (!this.writeAtLeastOnce) {
+ waitBeforeFinalization();
- FinalizeWriteStreamRequest finalizeWriteStreamRequest =
- FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
- FinalizeWriteStreamResponse finalizeResponse =
- retryFinalizeWriteStream(finalizeWriteStreamRequest);
+ FinalizeWriteStreamRequest finalizeWriteStreamRequest =
+ FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
+ FinalizeWriteStreamResponse finalizeResponse =
+ retryFinalizeWriteStream(finalizeWriteStreamRequest);
- long expectedFinalizedRowCount = writeStreamRowCount;
- long responseFinalizedRowCount = finalizeResponse.getRowCount();
- if (responseFinalizedRowCount != expectedFinalizedRowCount) {
- throw new IOException(
- String.format(
- "On stream %s finalization, expected finalized row count %d but received %d",
- writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
+ long expectedFinalizedRowCount = writeStreamRowCount;
+ responseFinalizedRowCount = finalizeResponse.getRowCount();
+ if (responseFinalizedRowCount != expectedFinalizedRowCount) {
+ throw new IOException(
+ String.format(
+ "On stream %s finalization, expected finalized row count %d but received %d",
+ writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
+ }
}
logger.debug(
- "Write-stream {} finalized with row-count {}", writeStreamName, responseFinalizedRowCount);
+ "Write-stream {} with name {} finalized with row-count {}",
+ partitionId,
+ writeStreamName,
+ responseFinalizedRowCount);
clean();
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
index 5409c87d2..6389e06a6 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java
@@ -116,6 +116,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
public static final String ENABLE_LIST_INFERENCE = "enableListInference";
public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat";
public static final String WRITE_METHOD_PARAM = "writeMethod";
+ public static final String WRITE_AT_LEAST_ONCE_OPTION = "writeAtLeastOnce";
@VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW;
@VisibleForTesting
@@ -205,6 +206,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
+ boolean writeAtLeastOnce = false;
// for V2 write with BigQuery Storage Write API
RetrySettings bigqueryDataWriteHelperRetrySettings =
RetrySettings.newBuilder().setMaxAttempts(5).build();
@@ -372,6 +374,8 @@ public static SparkBigQueryConfig from(
getAnyOption(globalOptions, options, WRITE_METHOD_PARAM)
.transform(WriteMethod::from)
.or(writeMethodDefault);
+ config.writeAtLeastOnce =
+ getAnyBooleanOption(globalOptions, options, WRITE_AT_LEAST_ONCE_OPTION, false);
boolean validateSparkAvro =
config.writeMethod == WriteMethod.INDIRECT
@@ -943,6 +947,10 @@ public WriteMethod getWriteMethod() {
return writeMethod;
}
+ public boolean isWriteAtLeastOnce() {
+ return writeAtLeastOnce;
+ }
+
public Optional getTraceId() {
return traceId.toJavaUtil();
}
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java
index 3b5ca50e9..119504d2a 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java
@@ -92,7 +92,7 @@ public void insert(Dataset data, boolean overwrite) {
throw new BigQueryConnectorException(
String.format(
"It seems that %s out of %s partitions have failed, aborting",
- numPartitions - writerCommitMessages.length, writerCommitMessages.length));
+ numPartitions - writerCommitMessages.length, numPartitions));
}
}
} catch (Exception e) {
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java
index c173838d8..5c25fef6d 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java
@@ -73,7 +73,8 @@ public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContex
tableConfig.getEnableModeCheckForSchemaFields(),
tableConfig.getBigQueryTableLabels(),
SchemaConvertersConfiguration.from(tableConfig),
- tableConfig.getKmsKeyName()); // needs to be serializable
+ tableConfig.getKmsKeyName(), // needs to be serializable
+ tableConfig.isWriteAtLeastOnce());
}
@Singleton
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java
index 9d074d435..6ef480769 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java
@@ -61,12 +61,14 @@ public class BigQueryDirectDataSourceWriterContext implements DataSourceWriterCo
private final String tablePathForBigQueryStorage;
private final SchemaConvertersConfiguration schemaConvertersConfiguration;
private final Optional destinationTableKmsKeyName;
+ private final boolean writeAtLeastOnce;
private BigQueryWriteClient writeClient;
private Optional tableInfo = Optional.absent();
enum WritingMode {
IGNORE_INPUTS,
+ APPEND_AT_LEAST_ONCE,
OVERWRITE,
ALL_ELSE
}
@@ -85,7 +87,8 @@ public BigQueryDirectDataSourceWriterContext(
boolean enableModeCheckForSchemaFields,
ImmutableMap tableLabels,
SchemaConvertersConfiguration schemaConvertersConfiguration,
- java.util.Optional destinationTableKmsKeyName)
+ java.util.Optional destinationTableKmsKeyName,
+ boolean writeAtLeastOnce)
throws IllegalArgumentException {
this.bigQueryClient = bigQueryClient;
this.writeClientFactory = bigQueryWriteClientFactory;
@@ -98,6 +101,7 @@ public BigQueryDirectDataSourceWriterContext(
this.tableLabels = tableLabels;
this.schemaConvertersConfiguration = schemaConvertersConfiguration;
this.destinationTableKmsKeyName = Optional.fromJavaUtil(destinationTableKmsKeyName);
+ this.writeAtLeastOnce = writeAtLeastOnce;
Schema bigQuerySchema =
SchemaConverters.from(this.schemaConvertersConfiguration).toBigQuerySchema(sparkSchema);
try {
@@ -142,6 +146,12 @@ private BigQueryTable getOrCreateTable(
"Destination table's schema is not compatible with dataframe's schema"));
switch (saveMode) {
case Append:
+ if (writeAtLeastOnce) {
+ writingMode = WritingMode.APPEND_AT_LEAST_ONCE;
+ return new BigQueryTable(
+ bigQueryClient.createTempTable(destinationTableId, bigQuerySchema).getTableId(),
+ true);
+ }
break;
case Overwrite:
writingMode = WritingMode.OVERWRITE;
@@ -177,7 +187,8 @@ public DataWriterContextFactory createWriterContextFactory() {
protoSchema,
writingMode.equals(WritingMode.IGNORE_INPUTS),
bigqueryDataWriterHelperRetrySettings,
- traceId);
+ traceId,
+ writeAtLeastOnce);
}
@Override
@@ -203,29 +214,35 @@ public void commit(WriterCommitMessageContext[] messages) {
writeUUID,
Arrays.toString(messages));
- BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest =
- BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage);
- for (WriterCommitMessageContext message : messages) {
- batchCommitWriteStreamsRequest.addWriteStreams(
- ((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName());
- }
- BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
- writeClient.batchCommitWriteStreams(batchCommitWriteStreamsRequest.build());
+ if (!writeAtLeastOnce) {
+ BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest =
+ BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage);
+ for (WriterCommitMessageContext message : messages) {
+ batchCommitWriteStreamsRequest.addWriteStreams(
+ ((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName());
+ }
+ BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
+ writeClient.batchCommitWriteStreams(batchCommitWriteStreamsRequest.build());
- if (!batchCommitWriteStreamsResponse.hasCommitTime()) {
- throw new BigQueryConnectorException(
- "DataSource writer failed to batch commit its BigQuery write-streams");
- }
+ if (!batchCommitWriteStreamsResponse.hasCommitTime()) {
+ throw new BigQueryConnectorException(
+ "DataSource writer failed to batch commit its BigQuery write-streams");
+ }
- logger.info(
- "BigQuery DataSource writer has committed at time: {}",
- batchCommitWriteStreamsResponse.getCommitTime());
+ logger.info(
+ "BigQuery DataSource writer has committed at time: {}",
+ batchCommitWriteStreamsResponse.getCommitTime());
+ }
- if (writingMode.equals(WritingMode.OVERWRITE)) {
- Job overwriteJob =
- bigQueryClient.overwriteDestinationWithTemporary(
- tableToWrite.getTableId(), destinationTableId);
- BigQueryClient.waitForJob(overwriteJob);
+ if (writingMode.equals(WritingMode.APPEND_AT_LEAST_ONCE)
+ || writingMode.equals(WritingMode.OVERWRITE)) {
+ Job queryJob =
+ (writingMode.equals(WritingMode.OVERWRITE))
+ ? bigQueryClient.overwriteDestinationWithTemporary(
+ tableToWrite.getTableId(), destinationTableId)
+ : bigQueryClient.appendDestinationWithTemporary(
+ tableToWrite.getTableId(), destinationTableId);
+ BigQueryClient.waitForJob(queryJob);
Preconditions.checkState(
bigQueryClient.deleteTable(tableToWrite.getTableId()),
new BigQueryConnectorException(
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java
index c74979ab4..811131fd9 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java
@@ -57,7 +57,8 @@ public BigQueryDirectDataWriterContext(
StructType sparkSchema,
ProtoSchema protoSchema,
RetrySettings bigqueryDataWriterHelperRetrySettings,
- Optional traceId) {
+ Optional traceId,
+ boolean writeAtLeastOnce) {
this.partitionId = partitionId;
this.taskId = taskId;
this.epochId = epochId;
@@ -76,7 +77,9 @@ public BigQueryDirectDataWriterContext(
tablePath,
protoSchema,
bigqueryDataWriterHelperRetrySettings,
- traceId);
+ traceId,
+ partitionId,
+ writeAtLeastOnce);
}
@Override
diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java
index 6284681aa..5b9687744 100644
--- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java
+++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java
@@ -31,6 +31,7 @@ public class BigQueryDirectDataWriterContextFactory
private final boolean ignoreInputs;
private final RetrySettings bigqueryDataWriterHelperRetrySettings;
private final Optional traceId;
+ private final boolean writeAtLeastOnce;
public BigQueryDirectDataWriterContextFactory(
BigQueryClientFactory writeClientFactory,
@@ -39,7 +40,8 @@ public BigQueryDirectDataWriterContextFactory(
ProtoSchema protoSchema,
boolean ignoreInputs,
RetrySettings bigqueryDataWriterHelperRetrySettings,
- Optional traceId) {
+ Optional traceId,
+ boolean writeAtLeastOnce) {
this.writeClientFactory = writeClientFactory;
this.tablePath = tablePath;
this.sparkSchema = sparkSchema;
@@ -47,6 +49,7 @@ public BigQueryDirectDataWriterContextFactory(
this.ignoreInputs = ignoreInputs;
this.bigqueryDataWriterHelperRetrySettings = bigqueryDataWriterHelperRetrySettings;
this.traceId = traceId;
+ this.writeAtLeastOnce = writeAtLeastOnce;
}
/**
@@ -75,6 +78,7 @@ public DataWriterContext createDataWriterContext(
sparkSchema,
protoSchema,
bigqueryDataWriterHelperRetrySettings,
- traceId);
+ traceId,
+ writeAtLeastOnce);
}
}
diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java
index d02dc75fe..ed11859fc 100644
--- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java
+++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java
@@ -161,11 +161,17 @@ private StandardTableDefinition testPartitionedTableDefinition() {
return bq.getTable(testDataset.toString(), testTable + "_partitioned").getDefinition();
}
- protected void writeToBigQuery(Dataset df, SaveMode mode) {
- writeToBigQuery(df, mode, "avro");
+ protected void writeToBigQueryAvroFormat(
+ Dataset df, SaveMode mode, String writeAtLeastOnce) {
+ writeToBigQuery(df, mode, "avro", writeAtLeastOnce);
}
protected void writeToBigQuery(Dataset df, SaveMode mode, String format) {
+ writeToBigQuery(df, mode, format, "False");
+ }
+
+ protected void writeToBigQuery(
+ Dataset df, SaveMode mode, String format, String writeAtLeastOnce) {
df.write()
.format("bigquery")
.mode(mode)
@@ -173,6 +179,7 @@ protected void writeToBigQuery(Dataset df, SaveMode mode, String format) {
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("intermediateFormat", format)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.save();
}
@@ -185,20 +192,30 @@ Dataset readAllTypesTable() {
.load();
}
- @Test
- public void testWriteToBigQuery_AppendSaveMode() throws InterruptedException {
+ private void writeToBigQuery_AppendSaveMode_Internal(String writeAtLeastOnce)
+ throws InterruptedException {
// initial write
- writeToBigQuery(initialData(), SaveMode.Append);
+ writeToBigQueryAvroFormat(initialData(), SaveMode.Append, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isTrue();
// second write
- writeToBigQuery(additonalData(), SaveMode.Append);
+ writeToBigQueryAvroFormat(additonalData(), SaveMode.Append, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(4);
assertThat(additionalDataValuesExist()).isTrue();
}
@Test
- public void testWriteToBigQuery_WithTableLabels() {
+ public void testWriteToBigQuery_AppendSaveMode() throws InterruptedException {
+ writeToBigQuery_AppendSaveMode_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuery_AppendSaveMode_AtLeastOnce() throws InterruptedException {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuery_AppendSaveMode_Internal("True");
+ }
+
+ private void writeToBigQuery_WithTableLabels_Internal(String writeAtLeastOnce) {
Dataset df = initialData();
df.write()
@@ -208,6 +225,7 @@ public void testWriteToBigQuery_WithTableLabels() {
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("intermediateFormat", "avro")
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.option("bigQueryTableLabel.alice", "bob")
.option("bigQueryTableLabel.foo", "bar")
.save();
@@ -220,7 +238,18 @@ public void testWriteToBigQuery_WithTableLabels() {
}
@Test
- public void testWriteToBigQuery_EnableListInference() throws InterruptedException {
+ public void testWriteToBigQuery_WithTableLabels() {
+ writeToBigQuery_WithTableLabels_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuery_WithTableLabels_AtLeastOnce() {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuery_WithTableLabels_Internal("True");
+ }
+
+ private void writeToBigQuery_EnableListInference_Internal(String writeAtLeastOnce)
+ throws InterruptedException {
Dataset df = initialData();
df.write()
.format("bigquery")
@@ -229,6 +258,7 @@ public void testWriteToBigQuery_EnableListInference() throws InterruptedExceptio
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("intermediateFormat", "parquet")
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.option("enableListInference", true)
.save();
@@ -246,49 +276,87 @@ public void testWriteToBigQuery_EnableListInference() throws InterruptedExceptio
}
@Test
- public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedException {
+ public void testWriteToBigQuery_EnableListInference() throws InterruptedException {
+ writeToBigQuery_EnableListInference_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuery_EnableListInference_AtLeastOnce() throws InterruptedException {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuery_EnableListInference_Internal("True");
+ }
+
+ private void writeToBigQuery_ErrorIfExistsSaveMode_Internal(String writeAtLeastOnce)
+ throws InterruptedException {
// initial write
- writeToBigQuery(initialData(), SaveMode.ErrorIfExists);
+ writeToBigQueryAvroFormat(initialData(), SaveMode.ErrorIfExists, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isTrue();
assertThrows(
expectedExceptionOnExistingTable,
- () -> writeToBigQuery(additonalData(), SaveMode.ErrorIfExists));
+ () -> writeToBigQueryAvroFormat(additonalData(), SaveMode.ErrorIfExists, writeAtLeastOnce));
}
@Test
- public void testWriteToBigQuery_IgnoreSaveMode() throws InterruptedException {
+ public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedException {
+ writeToBigQuery_ErrorIfExistsSaveMode_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuery_ErrorIfExistsSaveMode_AtLeastOnce() throws InterruptedException {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuery_ErrorIfExistsSaveMode_Internal("True");
+ }
+
+ private void writeToBigQuery_IgnoreSaveMode_Internal(String writeAtLeastOnce)
+ throws InterruptedException {
// initial write
- writeToBigQuery(initialData(), SaveMode.Ignore);
+ writeToBigQueryAvroFormat(initialData(), SaveMode.Ignore, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isTrue();
// second write
- writeToBigQuery(additonalData(), SaveMode.Ignore);
+ writeToBigQueryAvroFormat(additonalData(), SaveMode.Ignore, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isTrue();
assertThat(additionalDataValuesExist()).isFalse();
}
@Test
- public void testWriteToBigQuery_OverwriteSaveMode() throws InterruptedException {
+ public void testWriteToBigQuery_IgnoreSaveMode() throws InterruptedException {
+ writeToBigQuery_IgnoreSaveMode_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuery_IgnoreSaveMode_AtLeastOnce() throws InterruptedException {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuery_IgnoreSaveMode_Internal("True");
+ }
+
+ private void writeToBigQuery_OverwriteSaveMode_Internal(String writeAtLeastOnce)
+ throws InterruptedException {
// initial write
- writeToBigQuery(initialData(), SaveMode.Overwrite);
+ writeToBigQueryAvroFormat(initialData(), SaveMode.Overwrite, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isTrue();
- // Adding a two minute cushion as the data takes some time to move from buffer to the actual
- // table. Without this cushion, get the following error:
- // "UPDATE or DELETE statement over {DestinationTable} would affect rows in the streaming
- // buffer, which is not supported"
- Thread.sleep(120 * 1000);
-
// second write
- writeToBigQuery(additonalData(), SaveMode.Overwrite);
+ writeToBigQueryAvroFormat(additonalData(), SaveMode.Overwrite, writeAtLeastOnce);
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isFalse();
assertThat(additionalDataValuesExist()).isTrue();
}
+ @Test
+ public void testWriteToBigQuery_OverwriteSaveMode() throws InterruptedException {
+ writeToBigQuery_OverwriteSaveMode_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuery_OverwriteSaveMode_AtLeastOnce() throws InterruptedException {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuery_OverwriteSaveMode_Internal("True");
+ }
+
@Test
public void testWriteToBigQuery_AvroFormat() throws InterruptedException {
writeToBigQuery(initialData(), SaveMode.ErrorIfExists, "avro");
@@ -296,18 +364,30 @@ public void testWriteToBigQuery_AvroFormat() throws InterruptedException {
assertThat(initialDataValuesExist()).isTrue();
}
- @Test
- public void testWriteToBigQuerySimplifiedApi() throws InterruptedException {
+ private void writeToBigQuerySimplifiedApi_Internal(String writeAtLeastOnce)
+ throws InterruptedException {
initialData()
.write()
.format("bigquery")
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.save(fullTableName());
assertThat(testTableNumberOfRows()).isEqualTo(2);
assertThat(initialDataValuesExist()).isTrue();
}
+ @Test
+ public void testWriteToBigQuerySimplifiedApi() throws InterruptedException {
+ writeToBigQuerySimplifiedApi_Internal("False");
+ }
+
+ @Test
+ public void testWriteToBigQuerySimplifiedApi_AtLeastOnce() throws InterruptedException {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeToBigQuerySimplifiedApi_Internal("True");
+ }
+
@Test
public void testWriteToBigQueryAddingTheSettingsToSparkConf() throws InterruptedException {
spark.conf().set("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET);
@@ -321,8 +401,8 @@ public void testWriteToBigQueryAddingTheSettingsToSparkConf() throws Interrupted
assertThat(initialDataValuesExist()).isTrue();
}
- @Test
- public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception {
+ private void directWriteToBigQueryWithDiffInSchema_Internal(String writeAtLeastOnce)
+ throws Exception {
assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
int numOfRows = testTableNumberOfRows(destTableName);
@@ -338,13 +418,25 @@ public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception {
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.save(testDataset + "." + destTableName);
numOfRows = testTableNumberOfRows(destTableName);
assertThat(numOfRows).isEqualTo(1);
}
@Test
- public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throws Exception {
+ public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception {
+ directWriteToBigQueryWithDiffInSchema_Internal("False");
+ }
+
+ @Test
+ public void testDirectWriteToBigQueryWithDiffInSchema_AtLeastOnce() throws Exception {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ directWriteToBigQueryWithDiffInSchema_Internal("True");
+ }
+
+ private void directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal(
+ String writeAtLeastOnce) throws Exception {
assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
Dataset df =
@@ -358,6 +450,7 @@ public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throw
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.option("enableModeCheckForSchemaFields", false)
.save(testDataset + "." + destTableName);
int numOfRows = testTableNumberOfRows(destTableName);
@@ -365,7 +458,19 @@ public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throw
}
@Test
- public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception {
+ public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throws Exception {
+ directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal("False");
+ }
+
+ @Test
+ public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_AtLeastOnce()
+ throws Exception {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal("True");
+ }
+
+ private void directWriteToBigQueryWithDiffInDescription_Internal(String writeAtLeastOnce)
+ throws Exception {
assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE);
int numOfRows = testTableNumberOfRows(destTableName);
@@ -383,11 +488,23 @@ public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception {
.format("bigquery")
.mode(SaveMode.Append)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.save(testDataset + "." + destTableName);
numOfRows = testTableNumberOfRows(destTableName);
assertThat(numOfRows).isEqualTo(1);
}
+ @Test
+ public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception {
+ directWriteToBigQueryWithDiffInDescription_Internal("False");
+ }
+
+ @Test
+ public void testDirectWriteToBigQueryWithDiffInDescription_AtLeastOnce() throws Exception {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ directWriteToBigQueryWithDiffInDescription_Internal("True");
+ }
+
@Test
public void testInDirectWriteToBigQueryWithDiffInSchemaAndModeCheck() throws Exception {
assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
@@ -456,8 +573,8 @@ public void testInDirectWriteToBigQueryWithDiffInDescription() throws Exception
assertThat(numOfRows).isEqualTo(1);
}
- @Test
- public void testWriteDFNullableToBigQueryNullable() throws Exception {
+ private void writeDFNullableToBigQueryNullable_Internal(String writeAtLeastOnce)
+ throws Exception {
String destTableName =
createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_NULLABLE_FIELD);
StructType srcSchema =
@@ -470,13 +587,25 @@ public void testWriteDFNullableToBigQueryNullable() throws Exception {
.mode(SaveMode.Append)
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.save(testDataset + "." + destTableName);
int numOfRows = testTableNumberOfRows(destTableName);
assertThat(numOfRows).isEqualTo(2);
}
@Test
- public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Exception {
+ public void testWriteDFNullableToBigQueryNullable() throws Exception {
+ writeDFNullableToBigQueryNullable_Internal("False");
+ }
+
+ @Test
+ public void testWriteDFNullableToBigQueryNullable_AtLeastOnce() throws Exception {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeDFNullableToBigQueryNullable_Internal("True");
+ }
+
+ private void writeDFNullableWithNonNullDataToBigQueryRequired_Internal(String writeAtLeastOnce)
+ throws Exception {
assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
String destTableName =
createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REQUIRED_FIELD);
@@ -490,11 +619,23 @@ public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Except
.mode(SaveMode.Append)
.option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET)
.option("writeMethod", writeMethod.toString())
+ .option("writeAtLeastOnce", writeAtLeastOnce)
.save(testDataset + "." + destTableName);
int numOfRows = testTableNumberOfRows(destTableName);
assertThat(numOfRows).isEqualTo(1);
}
+ @Test
+ public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Exception {
+ writeDFNullableWithNonNullDataToBigQueryRequired_Internal("False");
+ }
+
+ @Test
+ public void testWriteDFNullableWithNonNullDataToBigQueryRequired_AtLeastOnce() throws Exception {
+ assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
+ writeDFNullableWithNonNullDataToBigQueryRequired_Internal("True");
+ }
+
@Test
public void testWriteNullableDFWithNullDataToBigQueryRequired() {
assumeThat(writeMethod, equalTo(WriteMethod.DIRECT));
@@ -852,7 +993,7 @@ public void testWriteToBigQueryWithDescription() {
@Test
public void testWriteEmptyDataFrame() throws Exception {
Dataset df = spark.createDataFrame(Collections.emptyList(), Link.class);
- writeToBigQuery(df, SaveMode.Append);
+ writeToBigQueryAvroFormat(df, SaveMode.Append, "False");
assertThat(testTableNumberOfRows()).isEqualTo(0);
}
@@ -914,7 +1055,7 @@ public void testCacheDataFrameInDataSource() {
assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT));
Dataset allTypesTable = readAllTypesTable();
- writeToBigQuery(allTypesTable, SaveMode.Overwrite, "avro");
+ writeToBigQuery(allTypesTable, SaveMode.Overwrite, "avro", "False");
Dataset df =
spark
diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java
index daeb3e205..932d98b46 100644
--- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java
+++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java
@@ -152,6 +152,7 @@ private BigQueryDirectDataSourceWriterContext createBigQueryDirectDataSourceWrit
true,
ImmutableMap.builder().build(),
SchemaConvertersConfiguration.of(ZoneId.of("UTC")),
- java.util.Optional.empty());
+ java.util.Optional.empty(),
+ false);
}
}