Skip to content

Commit

Permalink
Merge branch 'master' into rangePart
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkarve15 authored Jul 17, 2023
2 parents 80c64f1 + 8198153 commit 0e5b582
Show file tree
Hide file tree
Showing 31 changed files with 762 additions and 118 deletions.
5 changes: 3 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
## Next

* Issue #867: Support writing with RangePartitioning
* Issue #748: `_PARTITIONDATE` pseudo column is provided only for ingestion time **daily** partitioned tables
* Issue #990: Fix to support `allowFieldAddition` for columns with nested fields.
* Issue #993: Spark ML vector read and write fails
* PR #1007: Implement at-least-once option that utilizes default stream

## 0.31.1 - 2023-06-06

* Issue #748: `_PARTITIONDATE` pseudo column is provided only for ingestion time **daily** partitioned tables
* Issue #988: Read statistics are logged at TRACE level. Update the log4j configuration accordingly in order to log them.
* Issue #990: Fix to support `allowFieldAddition` for columns with nested fields.

## 0.31.0 - 2023-06-01

Expand Down
45 changes: 41 additions & 4 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,17 @@ df.writeStream \
The API Supports a number of options to configure the read

<!--- TODO(#2): Convert to markdown -->
<table>
<table id="propertytable">
<style>
table#propertytable td, table th
{
word-break:break-word
}
</style>
<tr valign="top">
<th>Property</th>
<th style="min-width:240px">Property</th>
<th>Meaning</th>
<th>Usage</th>
<th style="min-width:80px">Usage</th>
</tr>
<tr valign="top">
<td><code>table</code>
Expand Down Expand Up @@ -389,7 +395,8 @@ The API Supports a number of options to configure the read
may be less if BigQuery deems the data small enough. If there are not
enough executors to schedule a reader per partition, some partitions may
be empty.
<br/>(Optional. Defaults to 3 times the application's default parallelism)</a>.)
<br/>(Optional. Defaults to the smallest of 3 times the application's default parallelism
and maxParallelism</a>.)
</td>
<td>Read</td>
</tr>
Expand Down Expand Up @@ -531,6 +538,17 @@ The API Supports a number of options to configure the read
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>writeAtLeastOnce</code>
</td>
<td>Guarantees that data is written to BigQuery at least once. This is a lesser
guarantee than exactly once. This is suitable for streaming scenarios
in which data is continuously being written in small batches.
<br/>(Optional. Defaults to <code>false</code>)
<br/><i>Supported only by the `DIRECT` write method.</i>
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>temporaryGcsBucket</code>
</td>
Expand Down Expand Up @@ -1138,6 +1156,25 @@ You can manually set the number of partitions with the `maxParallelism` property

You can also always repartition after reading in Spark.

### I get quota exceeded errors while writing

If there are too many partitions the CreateWriteStream or Throughput [quotas](https://cloud.google.com/bigquery/quotas#write-api-limits)
may be exceeded. This occurs because while the data within each partition is processed serially, independent
partitions may be processed in parallel on different nodes within the spark cluster. Generally, to ensure maximum
sustained throughput you should file a quota increase request. However, you can also manually reduce the number of
partitions being written by calling <code>coalesce</code> on the DataFrame to mitigate this problem.

```
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
```

A rule of thumb is to have a single partition handle at least 1GB of data.

Also note that a job running with the `writeAtLeastOnce` property turned on will not encounter CreateWriteStream
quota errors.

### How do I authenticate outside GCE / Dataproc?

The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multiple
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ The API Supports a number of options to configure the read
may be less if BigQuery deems the data small enough. If there are not
enough executors to schedule a reader per partition, some partitions may
be empty.
<br/>(Optional. Defaults to 3 times the application's default parallelism)</a>.)
<br/>(Optional. Defaults to the smallest of 3 times the application's default parallelism
and maxParallelism</a>.)
</td>
<td>Read</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,23 @@ public boolean deleteTable(TableId tableId) {
return bigQuery.delete(tableId);
}

private Job copyData(
TableId sourceTableId,
TableId destinationTableId,
JobInfo.WriteDisposition writeDisposition) {
String queryFormat = "SELECT * FROM `%s`";
QueryJobConfiguration queryConfig =
jobConfigurationFactory
.createQueryJobConfigurationBuilder(
sqlFromFormat(queryFormat, sourceTableId), Collections.emptyMap())
.setUseLegacySql(false)
.setDestinationTable(destinationTableId)
.setWriteDisposition(writeDisposition)
.build();

return create(JobInfo.newBuilder(queryConfig).build());
}

/**
* Overwrites the given destination table, with all the data from the given temporary table,
* transactionally.
Expand All @@ -220,28 +237,25 @@ public boolean deleteTable(TableId tableId) {
*/
public Job overwriteDestinationWithTemporary(
TableId temporaryTableId, TableId destinationTableId) {
String queryFormat =
"MERGE `%s`\n"
+ "USING (SELECT * FROM `%s`)\n"
+ "ON FALSE\n"
+ "WHEN NOT MATCHED THEN INSERT ROW\n"
+ "WHEN NOT MATCHED BY SOURCE THEN DELETE";

QueryJobConfiguration queryConfig =
jobConfigurationFactory
.createQueryJobConfigurationBuilder(
sqlFromFormat(queryFormat, destinationTableId, temporaryTableId),
Collections.emptyMap())
.setUseLegacySql(false)
.build();
return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_TRUNCATE);
}

return create(JobInfo.newBuilder(queryConfig).build());
/**
* Appends all the data from the given temporary table, to the given destination table,
* transactionally.
*
* @param temporaryTableId The {@code TableId} representing the temporary-table.
* @param destinationTableId The {@code TableId} representing the destination table.
* @return The {@code Job} object representing this operation (which can be tracked to wait until
* it has finished successfully).
*/
public Job appendDestinationWithTemporary(TableId temporaryTableId, TableId destinationTableId) {
return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_APPEND);
}

String sqlFromFormat(String queryFormat, TableId destinationTableId, TableId temporaryTableId) {
String destinationTableName = fullTableName(destinationTableId);
String sqlFromFormat(String queryFormat, TableId temporaryTableId) {
String temporaryTableName = fullTableName(temporaryTableId);
return String.format(queryFormat, destinationTableName, temporaryTableName);
return String.format(queryFormat, temporaryTableName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class BigQueryDirectDataWriterHelper {
private final ProtoSchema protoSchema;
private final RetrySettings retrySettings;
private final Optional<String> traceId;
private final int partitionId;
private final boolean writeAtLeastOnce;

private String writeStreamName;
private StreamWriter streamWriter;
Expand All @@ -66,18 +68,26 @@ public BigQueryDirectDataWriterHelper(
String tablePath,
ProtoSchema protoSchema,
RetrySettings bigqueryDataWriterHelperRetrySettings,
Optional<String> traceId) {
Optional<String> traceId,
int partitionId,
boolean writeAtLeastOnce) {
this.writeClient = writeClientFactory.getBigQueryWriteClient();
this.tablePath = tablePath;
this.protoSchema = protoSchema;
this.retrySettings = bigqueryDataWriterHelperRetrySettings;
this.traceId = traceId;

try {
this.writeStreamName = retryCreateWriteStream();
} catch (ExecutionException | InterruptedException e) {
throw new BigQueryConnectorException(
"Could not create write-stream after multiple retries", e);
this.partitionId = partitionId;
this.writeAtLeastOnce = writeAtLeastOnce;

if (writeAtLeastOnce) {
this.writeStreamName = this.tablePath + "/_default";
} else {
try {
this.writeStreamName = retryCreateWriteStream();
} catch (ExecutionException | InterruptedException e) {
throw new BigQueryConnectorException(
"Could not create write-stream after multiple retries", e);
}
}
this.streamWriter = createStreamWriter(this.writeStreamName);
this.protoRows = ProtoRows.newBuilder();
Expand Down Expand Up @@ -181,7 +191,7 @@ public void addRow(ByteString message) throws IOException {
* (deduplication error) or if the response contains an error.
*/
private void sendAppendRowsRequest() throws IOException {
long offset = writeStreamRowCount;
long offset = this.writeAtLeastOnce ? -1 : writeStreamRowCount;

ApiFuture<AppendRowsResponse> appendRowsResponseApiFuture =
streamWriter.append(protoRows.build(), offset);
Expand Down Expand Up @@ -216,14 +226,16 @@ private void validateAppendRowsResponse(
"Append request failed with error: " + appendRowsResponse.getError().getMessage());
}

AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
long responseOffset = appendResult.getOffset().getValue();
if (!this.writeAtLeastOnce) {
AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
long responseOffset = appendResult.getOffset().getValue();

if (expectedOffset != responseOffset) {
throw new IOException(
String.format(
"On stream %s append-rows response, offset %d did not match expected offset %d",
writeStreamName, responseOffset, expectedOffset));
if (expectedOffset != responseOffset) {
throw new IOException(
String.format(
"On stream %s append-rows response, offset %d did not match expected offset %d",
writeStreamName, responseOffset, expectedOffset));
}
}
}

Expand All @@ -241,25 +253,31 @@ public long finalizeStream() throws IOException {
if (this.protoRows.getSerializedRowsCount() != 0) {
sendAppendRowsRequest();
}
long responseFinalizedRowCount = writeStreamRowCount;

waitBeforeFinalization();
if (!this.writeAtLeastOnce) {
waitBeforeFinalization();

FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
FinalizeWriteStreamResponse finalizeResponse =
retryFinalizeWriteStream(finalizeWriteStreamRequest);
FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
FinalizeWriteStreamResponse finalizeResponse =
retryFinalizeWriteStream(finalizeWriteStreamRequest);

long expectedFinalizedRowCount = writeStreamRowCount;
long responseFinalizedRowCount = finalizeResponse.getRowCount();
if (responseFinalizedRowCount != expectedFinalizedRowCount) {
throw new IOException(
String.format(
"On stream %s finalization, expected finalized row count %d but received %d",
writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
long expectedFinalizedRowCount = writeStreamRowCount;
responseFinalizedRowCount = finalizeResponse.getRowCount();
if (responseFinalizedRowCount != expectedFinalizedRowCount) {
throw new IOException(
String.format(
"On stream %s finalization, expected finalized row count %d but received %d",
writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
}
}

logger.debug(
"Write-stream {} finalized with row-count {}", writeStreamName, responseFinalizedRowCount);
"Write-stream {} with name {} finalized with row-count {}",
partitionId,
writeStreamName,
responseFinalizedRowCount);

clean();

Expand Down
7 changes: 5 additions & 2 deletions cloudbuild/nightly.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ case $STEP in
#coverage report
$MVN test jacoco:report jacoco:report-aggregate -Pcoverage,dsv1,dsv2
# Run integration tests
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,integration,dsv1,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,integration,dsv1,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4
# Run acceptance tests
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,acceptance,dsv1,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3
$MVN failsafe:integration-test failsafe:verify jacoco:report jacoco:report-aggregate -Pcoverage,acceptance,dsv1,dsv2_2.4,dsv2_3.1,dsv2_3.2,dsv2_3.3,dsv2_3.4
# Upload test coverage report to Codecov
bash <(curl -s https://codecov.io/bash) -K -F "nightly"

Expand Down Expand Up @@ -79,6 +79,9 @@ case $STEP in
gsutil cp "${M2REPO}/com/google/cloud/spark/spark-3.3-bigquery/${BUILD_REVISION}/spark-3.3-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}"
gsutil cp "gs://${BUCKET}/spark-3.3-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}/spark-3.3-bigquery-nightly-snapshot.jar"

gsutil cp "${M2REPO}/com/google/cloud/spark/spark-3.4-bigquery/${BUILD_REVISION}/spark-3.4-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}"
gsutil cp "gs://${BUCKET}/spark-3.4-bigquery-${BUILD_REVISION}.jar" "gs://${BUCKET}/spark-3.4-bigquery-nightly-snapshot.jar"

exit
;;

Expand Down
20 changes: 20 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,26 @@
<module>spark-bigquery-pushdown/spark-3.3-bigquery-pushdown_2.13</module>
</modules>
</profile>
<profile>
<id>dsv2_3.4</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-common</module>
<module>spark-bigquery-dsv2/spark-bigquery-dsv2-parent</module>
<module>spark-bigquery-dsv2/spark-3.1-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.2-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.3-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.4-bigquery-lib</module>
<module>spark-bigquery-dsv2/spark-3.4-bigquery</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-parent</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-common_2.12</module>
<module>spark-bigquery-pushdown/spark-bigquery-pushdown-common_2.13</module>
<module>spark-bigquery-pushdown/spark-3.3-bigquery-pushdown_2.12</module>
<module>spark-bigquery-pushdown/spark-3.3-bigquery-pushdown_2.13</module>
</modules>
</profile>
<profile>
<id>coverage</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
public static final String ENABLE_LIST_INFERENCE = "enableListInference";
public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat";
public static final String WRITE_METHOD_PARAM = "writeMethod";
public static final String WRITE_AT_LEAST_ONCE_OPTION = "writeAtLeastOnce";
@VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW;

@VisibleForTesting
Expand Down Expand Up @@ -209,6 +210,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
boolean writeAtLeastOnce = false;
// for V2 write with BigQuery Storage Write API
RetrySettings bigqueryDataWriteHelperRetrySettings =
RetrySettings.newBuilder().setMaxAttempts(5).build();
Expand Down Expand Up @@ -381,6 +383,8 @@ public static SparkBigQueryConfig from(
getAnyOption(globalOptions, options, WRITE_METHOD_PARAM)
.transform(WriteMethod::from)
.or(writeMethodDefault);
config.writeAtLeastOnce =
getAnyBooleanOption(globalOptions, options, WRITE_AT_LEAST_ONCE_OPTION, false);

boolean validateSparkAvro =
config.writeMethod == WriteMethod.INDIRECT
Expand Down Expand Up @@ -966,6 +970,10 @@ public WriteMethod getWriteMethod() {
return writeMethod;
}

public boolean isWriteAtLeastOnce() {
return writeAtLeastOnce;
}

public Optional<String> getTraceId() {
return traceId.toJavaUtil();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void insert(Dataset<Row> data, boolean overwrite) {
throw new BigQueryConnectorException(
String.format(
"It seems that %s out of %s partitions have failed, aborting",
numPartitions - writerCommitMessages.length, writerCommitMessages.length));
numPartitions - writerCommitMessages.length, numPartitions));
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContex
tableConfig.getEnableModeCheckForSchemaFields(),
tableConfig.getBigQueryTableLabels(),
SchemaConvertersConfiguration.from(tableConfig),
tableConfig.getKmsKeyName()); // needs to be serializable
tableConfig.getKmsKeyName(), // needs to be serializable
tableConfig.isWriteAtLeastOnce());
}

@Singleton
Expand Down
Loading

0 comments on commit 0e5b582

Please sign in to comment.