From 000429bf8bf10d7e4d2f9d1664115fc91ddb93e6 Mon Sep 17 00:00:00 2001 From: Siddharth Agrawal Date: Sun, 25 Jun 2023 00:28:51 -0700 Subject: [PATCH] Implement at-least-once option that utilizes default stream to support streaming scenarios in which small batches of data are written. --- README-template.md | 42 +++- .../connector/common/BigQueryClient.java | 50 +++-- .../BigQueryDirectDataWriterHelper.java | 74 ++++--- .../spark/bigquery/SparkBigQueryConfig.java | 8 + ...eryDataSourceWriterInsertableRelation.java | 2 +- .../BigQueryDataSourceWriterModule.java | 3 +- ...BigQueryDirectDataSourceWriterContext.java | 61 ++++-- .../BigQueryDirectDataWriterContext.java | 7 +- ...igQueryDirectDataWriterContextFactory.java | 8 +- .../integration/WriteIntegrationTestBase.java | 206 +++++++++++++++--- ...ueryDirectDataSourceWriterContextTest.java | 3 +- 11 files changed, 353 insertions(+), 111 deletions(-) diff --git a/README-template.md b/README-template.md index f97eb6b7d..a511c5882 100644 --- a/README-template.md +++ b/README-template.md @@ -325,11 +325,17 @@ df.writeStream \ The API Supports a number of options to configure the read - +
+ - + - + + + + + + @@ -1139,6 +1156,25 @@ You can manually set the number of partitions with the `maxParallelism` property You can also always repartition after reading in Spark. +### I get quota exceeded errors while writing + +If there are too many partitions the CreateWriteStream or Throughput [quotas](https://cloud.google.com/bigquery/quotas#write-api-limits) +may be exceeded. This occurs because while the data within each partition is processed serially, independent +partitions may be processed in parallel on different nodes within the spark cluster. Generally, to ensure maximum +sustained throughput you should file a quota increase request. However, you can also manually reduce the number of +partitions being written by calling coalesce on the DataFrame to mitigate this problem. + +``` +desiredPartitionCount = 5 +dfNew = df.coalesce(desiredPartitionCount) +dfNew.write +``` + +A rule of thumb is to have a single partition handle at least 1GB of data. + +Also note that a job running with the `writeAtLeastOnce` property turned on will not encounter CreateWriteStream +quota errors. + ### How do I authenticate outside GCE / Dataproc? The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multiple diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java index 5361a1aa8..966bade06 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryClient.java @@ -208,6 +208,23 @@ public boolean deleteTable(TableId tableId) { return bigQuery.delete(tableId); } + private Job copyData( + TableId sourceTableId, + TableId destinationTableId, + JobInfo.WriteDisposition writeDisposition) { + String queryFormat = "SELECT * FROM `%s`"; + QueryJobConfiguration queryConfig = + jobConfigurationFactory + .createQueryJobConfigurationBuilder( + sqlFromFormat(queryFormat, sourceTableId), Collections.emptyMap()) + .setUseLegacySql(false) + .setDestinationTable(destinationTableId) + .setWriteDisposition(writeDisposition) + .build(); + + return create(JobInfo.newBuilder(queryConfig).build()); + } + /** * Overwrites the given destination table, with all the data from the given temporary table, * transactionally. @@ -219,28 +236,25 @@ public boolean deleteTable(TableId tableId) { */ public Job overwriteDestinationWithTemporary( TableId temporaryTableId, TableId destinationTableId) { - String queryFormat = - "MERGE `%s`\n" - + "USING (SELECT * FROM `%s`)\n" - + "ON FALSE\n" - + "WHEN NOT MATCHED THEN INSERT ROW\n" - + "WHEN NOT MATCHED BY SOURCE THEN DELETE"; - - QueryJobConfiguration queryConfig = - jobConfigurationFactory - .createQueryJobConfigurationBuilder( - sqlFromFormat(queryFormat, destinationTableId, temporaryTableId), - Collections.emptyMap()) - .setUseLegacySql(false) - .build(); + return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_TRUNCATE); + } - return create(JobInfo.newBuilder(queryConfig).build()); + /** + * Appends all the data from the given temporary table, to the given destination table, + * transactionally. + * + * @param temporaryTableId The {@code TableId} representing the temporary-table. + * @param destinationTableId The {@code TableId} representing the destination table. + * @return The {@code Job} object representing this operation (which can be tracked to wait until + * it has finished successfully). + */ + public Job appendDestinationWithTemporary(TableId temporaryTableId, TableId destinationTableId) { + return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_APPEND); } - String sqlFromFormat(String queryFormat, TableId destinationTableId, TableId temporaryTableId) { - String destinationTableName = fullTableName(destinationTableId); + String sqlFromFormat(String queryFormat, TableId temporaryTableId) { String temporaryTableName = fullTableName(temporaryTableId); - return String.format(queryFormat, destinationTableName, temporaryTableName); + return String.format(queryFormat, temporaryTableName); } /** diff --git a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java index 6d08f6a64..4905661c5 100644 --- a/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java +++ b/bigquery-connector-common/src/main/java/com/google/cloud/bigquery/connector/common/BigQueryDirectDataWriterHelper.java @@ -52,6 +52,8 @@ public class BigQueryDirectDataWriterHelper { private final ProtoSchema protoSchema; private final RetrySettings retrySettings; private final Optional traceId; + private final int partitionId; + private final boolean writeAtLeastOnce; private String writeStreamName; private StreamWriter streamWriter; @@ -66,18 +68,26 @@ public BigQueryDirectDataWriterHelper( String tablePath, ProtoSchema protoSchema, RetrySettings bigqueryDataWriterHelperRetrySettings, - Optional traceId) { + Optional traceId, + int partitionId, + boolean writeAtLeastOnce) { this.writeClient = writeClientFactory.getBigQueryWriteClient(); this.tablePath = tablePath; this.protoSchema = protoSchema; this.retrySettings = bigqueryDataWriterHelperRetrySettings; this.traceId = traceId; - - try { - this.writeStreamName = retryCreateWriteStream(); - } catch (ExecutionException | InterruptedException e) { - throw new BigQueryConnectorException( - "Could not create write-stream after multiple retries", e); + this.partitionId = partitionId; + this.writeAtLeastOnce = writeAtLeastOnce; + + if (writeAtLeastOnce) { + this.writeStreamName = this.tablePath + "/_default"; + } else { + try { + this.writeStreamName = retryCreateWriteStream(); + } catch (ExecutionException | InterruptedException e) { + throw new BigQueryConnectorException( + "Could not create write-stream after multiple retries", e); + } } this.streamWriter = createStreamWriter(this.writeStreamName); this.protoRows = ProtoRows.newBuilder(); @@ -181,7 +191,7 @@ public void addRow(ByteString message) throws IOException { * (deduplication error) or if the response contains an error. */ private void sendAppendRowsRequest() throws IOException { - long offset = writeStreamRowCount; + long offset = this.writeAtLeastOnce ? -1 : writeStreamRowCount; ApiFuture appendRowsResponseApiFuture = streamWriter.append(protoRows.build(), offset); @@ -216,14 +226,16 @@ private void validateAppendRowsResponse( "Append request failed with error: " + appendRowsResponse.getError().getMessage()); } - AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult(); - long responseOffset = appendResult.getOffset().getValue(); + if (!this.writeAtLeastOnce) { + AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult(); + long responseOffset = appendResult.getOffset().getValue(); - if (expectedOffset != responseOffset) { - throw new IOException( - String.format( - "On stream %s append-rows response, offset %d did not match expected offset %d", - writeStreamName, responseOffset, expectedOffset)); + if (expectedOffset != responseOffset) { + throw new IOException( + String.format( + "On stream %s append-rows response, offset %d did not match expected offset %d", + writeStreamName, responseOffset, expectedOffset)); + } } } @@ -241,25 +253,31 @@ public long finalizeStream() throws IOException { if (this.protoRows.getSerializedRowsCount() != 0) { sendAppendRowsRequest(); } + long responseFinalizedRowCount = writeStreamRowCount; - waitBeforeFinalization(); + if (!this.writeAtLeastOnce) { + waitBeforeFinalization(); - FinalizeWriteStreamRequest finalizeWriteStreamRequest = - FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build(); - FinalizeWriteStreamResponse finalizeResponse = - retryFinalizeWriteStream(finalizeWriteStreamRequest); + FinalizeWriteStreamRequest finalizeWriteStreamRequest = + FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build(); + FinalizeWriteStreamResponse finalizeResponse = + retryFinalizeWriteStream(finalizeWriteStreamRequest); - long expectedFinalizedRowCount = writeStreamRowCount; - long responseFinalizedRowCount = finalizeResponse.getRowCount(); - if (responseFinalizedRowCount != expectedFinalizedRowCount) { - throw new IOException( - String.format( - "On stream %s finalization, expected finalized row count %d but received %d", - writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount)); + long expectedFinalizedRowCount = writeStreamRowCount; + responseFinalizedRowCount = finalizeResponse.getRowCount(); + if (responseFinalizedRowCount != expectedFinalizedRowCount) { + throw new IOException( + String.format( + "On stream %s finalization, expected finalized row count %d but received %d", + writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount)); + } } logger.debug( - "Write-stream {} finalized with row-count {}", writeStreamName, responseFinalizedRowCount); + "Write-stream {} with name {} finalized with row-count {}", + partitionId, + writeStreamName, + responseFinalizedRowCount); clean(); diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java index 5409c87d2..6389e06a6 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/SparkBigQueryConfig.java @@ -116,6 +116,7 @@ public static WriteMethod from(@Nullable String writeMethod) { public static final String ENABLE_LIST_INFERENCE = "enableListInference"; public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat"; public static final String WRITE_METHOD_PARAM = "writeMethod"; + public static final String WRITE_AT_LEAST_ONCE_OPTION = "writeAtLeastOnce"; @VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW; @VisibleForTesting @@ -205,6 +206,7 @@ public static WriteMethod from(@Nullable String writeMethod) { private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig; private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC; private WriteMethod writeMethod = DEFAULT_WRITE_METHOD; + boolean writeAtLeastOnce = false; // for V2 write with BigQuery Storage Write API RetrySettings bigqueryDataWriteHelperRetrySettings = RetrySettings.newBuilder().setMaxAttempts(5).build(); @@ -372,6 +374,8 @@ public static SparkBigQueryConfig from( getAnyOption(globalOptions, options, WRITE_METHOD_PARAM) .transform(WriteMethod::from) .or(writeMethodDefault); + config.writeAtLeastOnce = + getAnyBooleanOption(globalOptions, options, WRITE_AT_LEAST_ONCE_OPTION, false); boolean validateSparkAvro = config.writeMethod == WriteMethod.INDIRECT @@ -943,6 +947,10 @@ public WriteMethod getWriteMethod() { return writeMethod; } + public boolean isWriteAtLeastOnce() { + return writeAtLeastOnce; + } + public Optional getTraceId() { return traceId.toJavaUtil(); } diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java index 3b5ca50e9..119504d2a 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/BigQueryDataSourceWriterInsertableRelation.java @@ -92,7 +92,7 @@ public void insert(Dataset data, boolean overwrite) { throw new BigQueryConnectorException( String.format( "It seems that %s out of %s partitions have failed, aborting", - numPartitions - writerCommitMessages.length, writerCommitMessages.length)); + numPartitions - writerCommitMessages.length, numPartitions)); } } } catch (Exception e) { diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java index c173838d8..5c25fef6d 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDataSourceWriterModule.java @@ -73,7 +73,8 @@ public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContex tableConfig.getEnableModeCheckForSchemaFields(), tableConfig.getBigQueryTableLabels(), SchemaConvertersConfiguration.from(tableConfig), - tableConfig.getKmsKeyName()); // needs to be serializable + tableConfig.getKmsKeyName(), // needs to be serializable + tableConfig.isWriteAtLeastOnce()); } @Singleton diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java index 9d074d435..6ef480769 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContext.java @@ -61,12 +61,14 @@ public class BigQueryDirectDataSourceWriterContext implements DataSourceWriterCo private final String tablePathForBigQueryStorage; private final SchemaConvertersConfiguration schemaConvertersConfiguration; private final Optional destinationTableKmsKeyName; + private final boolean writeAtLeastOnce; private BigQueryWriteClient writeClient; private Optional tableInfo = Optional.absent(); enum WritingMode { IGNORE_INPUTS, + APPEND_AT_LEAST_ONCE, OVERWRITE, ALL_ELSE } @@ -85,7 +87,8 @@ public BigQueryDirectDataSourceWriterContext( boolean enableModeCheckForSchemaFields, ImmutableMap tableLabels, SchemaConvertersConfiguration schemaConvertersConfiguration, - java.util.Optional destinationTableKmsKeyName) + java.util.Optional destinationTableKmsKeyName, + boolean writeAtLeastOnce) throws IllegalArgumentException { this.bigQueryClient = bigQueryClient; this.writeClientFactory = bigQueryWriteClientFactory; @@ -98,6 +101,7 @@ public BigQueryDirectDataSourceWriterContext( this.tableLabels = tableLabels; this.schemaConvertersConfiguration = schemaConvertersConfiguration; this.destinationTableKmsKeyName = Optional.fromJavaUtil(destinationTableKmsKeyName); + this.writeAtLeastOnce = writeAtLeastOnce; Schema bigQuerySchema = SchemaConverters.from(this.schemaConvertersConfiguration).toBigQuerySchema(sparkSchema); try { @@ -142,6 +146,12 @@ private BigQueryTable getOrCreateTable( "Destination table's schema is not compatible with dataframe's schema")); switch (saveMode) { case Append: + if (writeAtLeastOnce) { + writingMode = WritingMode.APPEND_AT_LEAST_ONCE; + return new BigQueryTable( + bigQueryClient.createTempTable(destinationTableId, bigQuerySchema).getTableId(), + true); + } break; case Overwrite: writingMode = WritingMode.OVERWRITE; @@ -177,7 +187,8 @@ public DataWriterContextFactory createWriterContextFactory() { protoSchema, writingMode.equals(WritingMode.IGNORE_INPUTS), bigqueryDataWriterHelperRetrySettings, - traceId); + traceId, + writeAtLeastOnce); } @Override @@ -203,29 +214,35 @@ public void commit(WriterCommitMessageContext[] messages) { writeUUID, Arrays.toString(messages)); - BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest = - BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage); - for (WriterCommitMessageContext message : messages) { - batchCommitWriteStreamsRequest.addWriteStreams( - ((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName()); - } - BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = - writeClient.batchCommitWriteStreams(batchCommitWriteStreamsRequest.build()); + if (!writeAtLeastOnce) { + BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest = + BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage); + for (WriterCommitMessageContext message : messages) { + batchCommitWriteStreamsRequest.addWriteStreams( + ((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName()); + } + BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = + writeClient.batchCommitWriteStreams(batchCommitWriteStreamsRequest.build()); - if (!batchCommitWriteStreamsResponse.hasCommitTime()) { - throw new BigQueryConnectorException( - "DataSource writer failed to batch commit its BigQuery write-streams"); - } + if (!batchCommitWriteStreamsResponse.hasCommitTime()) { + throw new BigQueryConnectorException( + "DataSource writer failed to batch commit its BigQuery write-streams"); + } - logger.info( - "BigQuery DataSource writer has committed at time: {}", - batchCommitWriteStreamsResponse.getCommitTime()); + logger.info( + "BigQuery DataSource writer has committed at time: {}", + batchCommitWriteStreamsResponse.getCommitTime()); + } - if (writingMode.equals(WritingMode.OVERWRITE)) { - Job overwriteJob = - bigQueryClient.overwriteDestinationWithTemporary( - tableToWrite.getTableId(), destinationTableId); - BigQueryClient.waitForJob(overwriteJob); + if (writingMode.equals(WritingMode.APPEND_AT_LEAST_ONCE) + || writingMode.equals(WritingMode.OVERWRITE)) { + Job queryJob = + (writingMode.equals(WritingMode.OVERWRITE)) + ? bigQueryClient.overwriteDestinationWithTemporary( + tableToWrite.getTableId(), destinationTableId) + : bigQueryClient.appendDestinationWithTemporary( + tableToWrite.getTableId(), destinationTableId); + BigQueryClient.waitForJob(queryJob); Preconditions.checkState( bigQueryClient.deleteTable(tableToWrite.getTableId()), new BigQueryConnectorException( diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java index c74979ab4..811131fd9 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContext.java @@ -57,7 +57,8 @@ public BigQueryDirectDataWriterContext( StructType sparkSchema, ProtoSchema protoSchema, RetrySettings bigqueryDataWriterHelperRetrySettings, - Optional traceId) { + Optional traceId, + boolean writeAtLeastOnce) { this.partitionId = partitionId; this.taskId = taskId; this.epochId = epochId; @@ -76,7 +77,9 @@ public BigQueryDirectDataWriterContext( tablePath, protoSchema, bigqueryDataWriterHelperRetrySettings, - traceId); + traceId, + partitionId, + writeAtLeastOnce); } @Override diff --git a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java index 6284681aa..5b9687744 100644 --- a/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java +++ b/spark-bigquery-connector-common/src/main/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataWriterContextFactory.java @@ -31,6 +31,7 @@ public class BigQueryDirectDataWriterContextFactory private final boolean ignoreInputs; private final RetrySettings bigqueryDataWriterHelperRetrySettings; private final Optional traceId; + private final boolean writeAtLeastOnce; public BigQueryDirectDataWriterContextFactory( BigQueryClientFactory writeClientFactory, @@ -39,7 +40,8 @@ public BigQueryDirectDataWriterContextFactory( ProtoSchema protoSchema, boolean ignoreInputs, RetrySettings bigqueryDataWriterHelperRetrySettings, - Optional traceId) { + Optional traceId, + boolean writeAtLeastOnce) { this.writeClientFactory = writeClientFactory; this.tablePath = tablePath; this.sparkSchema = sparkSchema; @@ -47,6 +49,7 @@ public BigQueryDirectDataWriterContextFactory( this.ignoreInputs = ignoreInputs; this.bigqueryDataWriterHelperRetrySettings = bigqueryDataWriterHelperRetrySettings; this.traceId = traceId; + this.writeAtLeastOnce = writeAtLeastOnce; } /** @@ -75,6 +78,7 @@ public DataWriterContext createDataWriterContext( sparkSchema, protoSchema, bigqueryDataWriterHelperRetrySettings, - traceId); + traceId, + writeAtLeastOnce); } } diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java index d02dc75fe..5f6cc8218 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/WriteIntegrationTestBase.java @@ -162,10 +162,15 @@ private StandardTableDefinition testPartitionedTableDefinition() { } protected void writeToBigQuery(Dataset df, SaveMode mode) { - writeToBigQuery(df, mode, "avro"); + writeToBigQuery(df, mode, "False"); } - protected void writeToBigQuery(Dataset df, SaveMode mode, String format) { + protected void writeToBigQuery(Dataset df, SaveMode mode, String writeAtLeastOnce) { + writeToBigQuery(df, mode, "avro", writeAtLeastOnce); + } + + protected void writeToBigQuery( + Dataset df, SaveMode mode, String format, String writeAtLeastOnce) { df.write() .format("bigquery") .mode(mode) @@ -173,6 +178,7 @@ protected void writeToBigQuery(Dataset df, SaveMode mode, String format) { .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .option("intermediateFormat", format) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .save(); } @@ -185,20 +191,30 @@ Dataset readAllTypesTable() { .load(); } - @Test - public void testWriteToBigQuery_AppendSaveMode() throws InterruptedException { + private void writeToBigQuery_AppendSaveMode_Internal(String writeAtLeastOnce) + throws InterruptedException { // initial write - writeToBigQuery(initialData(), SaveMode.Append); + writeToBigQuery(initialData(), SaveMode.Append, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); // second write - writeToBigQuery(additonalData(), SaveMode.Append); + writeToBigQuery(additonalData(), SaveMode.Append, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(4); assertThat(additionalDataValuesExist()).isTrue(); } @Test - public void testWriteToBigQuery_WithTableLabels() { + public void testWriteToBigQuery_AppendSaveMode() throws InterruptedException { + writeToBigQuery_AppendSaveMode_Internal("False"); + } + + @Test + public void testWriteToBigQuery_AppendSaveMode_AtLeastOnce() throws InterruptedException { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuery_AppendSaveMode_Internal("True"); + } + + private void writeToBigQuery_WithTableLabels_Internal(String writeAtLeastOnce) { Dataset df = initialData(); df.write() @@ -208,6 +224,7 @@ public void testWriteToBigQuery_WithTableLabels() { .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .option("intermediateFormat", "avro") .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .option("bigQueryTableLabel.alice", "bob") .option("bigQueryTableLabel.foo", "bar") .save(); @@ -220,7 +237,18 @@ public void testWriteToBigQuery_WithTableLabels() { } @Test - public void testWriteToBigQuery_EnableListInference() throws InterruptedException { + public void testWriteToBigQuery_WithTableLabels() { + writeToBigQuery_WithTableLabels_Internal("False"); + } + + @Test + public void testWriteToBigQuery_WithTableLabels_AtLeastOnce() { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuery_WithTableLabels_Internal("True"); + } + + private void writeToBigQuery_EnableListInference_Internal(String writeAtLeastOnce) + throws InterruptedException { Dataset df = initialData(); df.write() .format("bigquery") @@ -229,6 +257,7 @@ public void testWriteToBigQuery_EnableListInference() throws InterruptedExceptio .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .option("intermediateFormat", "parquet") .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .option("enableListInference", true) .save(); @@ -246,9 +275,20 @@ public void testWriteToBigQuery_EnableListInference() throws InterruptedExceptio } @Test - public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedException { + public void testWriteToBigQuery_EnableListInference() throws InterruptedException { + writeToBigQuery_EnableListInference_Internal("False"); + } + + @Test + public void testWriteToBigQuery_EnableListInference_AtLeastOnce() throws InterruptedException { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuery_EnableListInference_Internal("True"); + } + + private void writeToBigQuery_ErrorIfExistsSaveMode_Internal(String writeAtLeastOnce) + throws InterruptedException { // initial write - writeToBigQuery(initialData(), SaveMode.ErrorIfExists); + writeToBigQuery(initialData(), SaveMode.ErrorIfExists, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); assertThrows( @@ -257,57 +297,96 @@ public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedExcept } @Test - public void testWriteToBigQuery_IgnoreSaveMode() throws InterruptedException { + public void testWriteToBigQuery_ErrorIfExistsSaveMode() throws InterruptedException { + writeToBigQuery_ErrorIfExistsSaveMode_Internal("False"); + } + + @Test + public void testWriteToBigQuery_ErrorIfExistsSaveMode_AtLeastOnce() throws InterruptedException { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuery_ErrorIfExistsSaveMode_Internal("True"); + } + + private void writeToBigQuery_IgnoreSaveMode_Internal(String writeAtLeastOnce) + throws InterruptedException { // initial write - writeToBigQuery(initialData(), SaveMode.Ignore); + writeToBigQuery(initialData(), SaveMode.Ignore, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); // second write - writeToBigQuery(additonalData(), SaveMode.Ignore); + writeToBigQuery(additonalData(), SaveMode.Ignore, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); assertThat(additionalDataValuesExist()).isFalse(); } @Test - public void testWriteToBigQuery_OverwriteSaveMode() throws InterruptedException { + public void testWriteToBigQuery_IgnoreSaveMode() throws InterruptedException { + writeToBigQuery_IgnoreSaveMode_Internal("False"); + } + + @Test + public void testWriteToBigQuery_IgnoreSaveMode_AtLeastOnce() throws InterruptedException { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuery_IgnoreSaveMode_Internal("True"); + } + + private void writeToBigQuery_OverwriteSaveMode_Internal(String writeAtLeastOnce) + throws InterruptedException { // initial write - writeToBigQuery(initialData(), SaveMode.Overwrite); + writeToBigQuery(initialData(), SaveMode.Overwrite, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); - // Adding a two minute cushion as the data takes some time to move from buffer to the actual - // table. Without this cushion, get the following error: - // "UPDATE or DELETE statement over {DestinationTable} would affect rows in the streaming - // buffer, which is not supported" - Thread.sleep(120 * 1000); - // second write - writeToBigQuery(additonalData(), SaveMode.Overwrite); + writeToBigQuery(additonalData(), SaveMode.Overwrite, writeAtLeastOnce); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isFalse(); assertThat(additionalDataValuesExist()).isTrue(); } + @Test + public void testWriteToBigQuery_OverwriteSaveMode() throws InterruptedException { + writeToBigQuery_OverwriteSaveMode_Internal("False"); + } + + @Test + public void testWriteToBigQuery_OverwriteSaveMode_AtLeastOnce() throws InterruptedException { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuery_OverwriteSaveMode_Internal("True"); + } + @Test public void testWriteToBigQuery_AvroFormat() throws InterruptedException { - writeToBigQuery(initialData(), SaveMode.ErrorIfExists, "avro"); + writeToBigQuery(initialData(), SaveMode.ErrorIfExists, "avro", "False"); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); } - @Test - public void testWriteToBigQuerySimplifiedApi() throws InterruptedException { + private void writeToBigQuerySimplifiedApi_Internal(String writeAtLeastOnce) + throws InterruptedException { initialData() .write() .format("bigquery") .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .save(fullTableName()); assertThat(testTableNumberOfRows()).isEqualTo(2); assertThat(initialDataValuesExist()).isTrue(); } + @Test + public void testWriteToBigQuerySimplifiedApi() throws InterruptedException { + writeToBigQuerySimplifiedApi_Internal("False"); + } + + @Test + public void testWriteToBigQuerySimplifiedApi_AtLeastOnce() throws InterruptedException { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeToBigQuerySimplifiedApi_Internal("True"); + } + @Test public void testWriteToBigQueryAddingTheSettingsToSparkConf() throws InterruptedException { spark.conf().set("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET); @@ -321,8 +400,8 @@ public void testWriteToBigQueryAddingTheSettingsToSparkConf() throws Interrupted assertThat(initialDataValuesExist()).isTrue(); } - @Test - public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception { + private void directWriteToBigQueryWithDiffInSchema_Internal(String writeAtLeastOnce) + throws Exception { assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE); int numOfRows = testTableNumberOfRows(destTableName); @@ -338,13 +417,25 @@ public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception { .format("bigquery") .mode(SaveMode.Append) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .save(testDataset + "." + destTableName); numOfRows = testTableNumberOfRows(destTableName); assertThat(numOfRows).isEqualTo(1); } @Test - public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throws Exception { + public void testDirectWriteToBigQueryWithDiffInSchema() throws Exception { + directWriteToBigQueryWithDiffInSchema_Internal("False"); + } + + @Test + public void testDirectWriteToBigQueryWithDiffInSchema_AtLeastOnce() throws Exception { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + directWriteToBigQueryWithDiffInSchema_Internal("True"); + } + + private void directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal( + String writeAtLeastOnce) throws Exception { assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE); Dataset df = @@ -358,6 +449,7 @@ public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throw .format("bigquery") .mode(SaveMode.Append) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .option("enableModeCheckForSchemaFields", false) .save(testDataset + "." + destTableName); int numOfRows = testTableNumberOfRows(destTableName); @@ -365,7 +457,19 @@ public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throw } @Test - public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception { + public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck() throws Exception { + directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal("False"); + } + + @Test + public void testDirectWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_AtLeastOnce() + throws Exception { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + directWriteToBigQueryWithDiffInSchemaAndDisableModeCheck_Internal("True"); + } + + private void directWriteToBigQueryWithDiffInDescription_Internal(String writeAtLeastOnce) + throws Exception { assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE); int numOfRows = testTableNumberOfRows(destTableName); @@ -383,11 +487,23 @@ public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception { .format("bigquery") .mode(SaveMode.Append) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .save(testDataset + "." + destTableName); numOfRows = testTableNumberOfRows(destTableName); assertThat(numOfRows).isEqualTo(1); } + @Test + public void testDirectWriteToBigQueryWithDiffInDescription() throws Exception { + directWriteToBigQueryWithDiffInDescription_Internal("False"); + } + + @Test + public void testDirectWriteToBigQueryWithDiffInDescription_AtLeastOnce() throws Exception { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + directWriteToBigQueryWithDiffInDescription_Internal("True"); + } + @Test public void testInDirectWriteToBigQueryWithDiffInSchemaAndModeCheck() throws Exception { assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT)); @@ -456,8 +572,8 @@ public void testInDirectWriteToBigQueryWithDiffInDescription() throws Exception assertThat(numOfRows).isEqualTo(1); } - @Test - public void testWriteDFNullableToBigQueryNullable() throws Exception { + private void writeDFNullableToBigQueryNullable_Internal(String writeAtLeastOnce) + throws Exception { String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_NULLABLE_FIELD); StructType srcSchema = @@ -470,13 +586,25 @@ public void testWriteDFNullableToBigQueryNullable() throws Exception { .mode(SaveMode.Append) .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .save(testDataset + "." + destTableName); int numOfRows = testTableNumberOfRows(destTableName); assertThat(numOfRows).isEqualTo(2); } @Test - public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Exception { + public void testWriteDFNullableToBigQueryNullable() throws Exception { + writeDFNullableToBigQueryNullable_Internal("False"); + } + + @Test + public void testWriteDFNullableToBigQueryNullable_AtLeastOnce() throws Exception { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeDFNullableToBigQueryNullable_Internal("True"); + } + + private void writeDFNullableWithNonNullDataToBigQueryRequired_Internal(String writeAtLeastOnce) + throws Exception { assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); String destTableName = createDiffInSchemaDestTable(TestConstants.DIFF_IN_SCHEMA_DEST_TABLE_WITH_REQUIRED_FIELD); @@ -490,11 +618,23 @@ public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Except .mode(SaveMode.Append) .option("temporaryGcsBucket", TestConstants.TEMPORARY_GCS_BUCKET) .option("writeMethod", writeMethod.toString()) + .option("writeAtLeastOnce", writeAtLeastOnce) .save(testDataset + "." + destTableName); int numOfRows = testTableNumberOfRows(destTableName); assertThat(numOfRows).isEqualTo(1); } + @Test + public void testWriteDFNullableWithNonNullDataToBigQueryRequired() throws Exception { + writeDFNullableWithNonNullDataToBigQueryRequired_Internal("False"); + } + + @Test + public void testWriteDFNullableWithNonNullDataToBigQueryRequired_AtLeastOnce() throws Exception { + assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); + writeDFNullableWithNonNullDataToBigQueryRequired_Internal("True"); + } + @Test public void testWriteNullableDFWithNullDataToBigQueryRequired() { assumeThat(writeMethod, equalTo(WriteMethod.DIRECT)); @@ -914,7 +1054,7 @@ public void testCacheDataFrameInDataSource() { assumeThat(writeMethod, equalTo(SparkBigQueryConfig.WriteMethod.INDIRECT)); Dataset allTypesTable = readAllTypesTable(); - writeToBigQuery(allTypesTable, SaveMode.Overwrite, "avro"); + writeToBigQuery(allTypesTable, SaveMode.Overwrite, "avro", "False"); Dataset df = spark diff --git a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java index daeb3e205..932d98b46 100644 --- a/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java +++ b/spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/write/context/BigQueryDirectDataSourceWriterContextTest.java @@ -152,6 +152,7 @@ private BigQueryDirectDataSourceWriterContext createBigQueryDirectDataSourceWrit true, ImmutableMap.builder().build(), SchemaConvertersConfiguration.of(ZoneId.of("UTC")), - java.util.Optional.empty()); + java.util.Optional.empty(), + false); } }
PropertyProperty MeaningUsageUsage
table @@ -532,6 +538,17 @@ The API Supports a number of options to configure the read Write
writeAtLeastOnce + Guarantees that data is written to BigQuery at least once. This is a lesser + guarantee than exactly once. This is suitable for streaming scenarios + in which data is continuously being written in small batches. +
(Optional. Defaults to false) +
Supported only by the `DIRECT` write method. +
Write
temporaryGcsBucket