Skip to content

Commit

Permalink
Implement at-least-once option that utilizes default stream (#1007)
Browse files Browse the repository at this point in the history
Supporting streaming scenarios in which small batches of data are written.
  • Loading branch information
agrawal-siddharth authored Jul 13, 2023
1 parent 2f66df1 commit 38b0ef2
Show file tree
Hide file tree
Showing 12 changed files with 356 additions and 112 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Next

* Issue #993: Spark ML vector read and write fails
* PR #1007: Implement at-least-once option that utilizes default stream

## 0.31.1 - 2023-06-06

Expand Down
42 changes: 39 additions & 3 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,17 @@ df.writeStream \
The API Supports a number of options to configure the read

<!--- TODO(#2): Convert to markdown -->
<table>
<table id="propertytable">
<style>
table#propertytable td, table th
{
word-break:break-word
}
</style>
<tr valign="top">
<th>Property</th>
<th style="min-width:240px">Property</th>
<th>Meaning</th>
<th>Usage</th>
<th style="min-width:80px">Usage</th>
</tr>
<tr valign="top">
<td><code>table</code>
Expand Down Expand Up @@ -532,6 +538,17 @@ The API Supports a number of options to configure the read
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>writeAtLeastOnce</code>
</td>
<td>Guarantees that data is written to BigQuery at least once. This is a lesser
guarantee than exactly once. This is suitable for streaming scenarios
in which data is continuously being written in small batches.
<br/>(Optional. Defaults to <code>false</code>)
<br/><i>Supported only by the `DIRECT` write method.</i>
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>temporaryGcsBucket</code>
</td>
Expand Down Expand Up @@ -1139,6 +1156,25 @@ You can manually set the number of partitions with the `maxParallelism` property

You can also always repartition after reading in Spark.

### I get quota exceeded errors while writing

If there are too many partitions the CreateWriteStream or Throughput [quotas](https://cloud.google.com/bigquery/quotas#write-api-limits)
may be exceeded. This occurs because while the data within each partition is processed serially, independent
partitions may be processed in parallel on different nodes within the spark cluster. Generally, to ensure maximum
sustained throughput you should file a quota increase request. However, you can also manually reduce the number of
partitions being written by calling <code>coalesce</code> on the DataFrame to mitigate this problem.

```
desiredPartitionCount = 5
dfNew = df.coalesce(desiredPartitionCount)
dfNew.write
```

A rule of thumb is to have a single partition handle at least 1GB of data.

Also note that a job running with the `writeAtLeastOnce` property turned on will not encounter CreateWriteStream
quota errors.

### How do I authenticate outside GCE / Dataproc?

The connector needs an instance of a GoogleCredentials in order to connect to the BigQuery APIs. There are multiple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ public boolean deleteTable(TableId tableId) {
return bigQuery.delete(tableId);
}

private Job copyData(
TableId sourceTableId,
TableId destinationTableId,
JobInfo.WriteDisposition writeDisposition) {
String queryFormat = "SELECT * FROM `%s`";
QueryJobConfiguration queryConfig =
jobConfigurationFactory
.createQueryJobConfigurationBuilder(
sqlFromFormat(queryFormat, sourceTableId), Collections.emptyMap())
.setUseLegacySql(false)
.setDestinationTable(destinationTableId)
.setWriteDisposition(writeDisposition)
.build();

return create(JobInfo.newBuilder(queryConfig).build());
}

/**
* Overwrites the given destination table, with all the data from the given temporary table,
* transactionally.
Expand All @@ -219,28 +236,25 @@ public boolean deleteTable(TableId tableId) {
*/
public Job overwriteDestinationWithTemporary(
TableId temporaryTableId, TableId destinationTableId) {
String queryFormat =
"MERGE `%s`\n"
+ "USING (SELECT * FROM `%s`)\n"
+ "ON FALSE\n"
+ "WHEN NOT MATCHED THEN INSERT ROW\n"
+ "WHEN NOT MATCHED BY SOURCE THEN DELETE";

QueryJobConfiguration queryConfig =
jobConfigurationFactory
.createQueryJobConfigurationBuilder(
sqlFromFormat(queryFormat, destinationTableId, temporaryTableId),
Collections.emptyMap())
.setUseLegacySql(false)
.build();
return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_TRUNCATE);
}

return create(JobInfo.newBuilder(queryConfig).build());
/**
* Appends all the data from the given temporary table, to the given destination table,
* transactionally.
*
* @param temporaryTableId The {@code TableId} representing the temporary-table.
* @param destinationTableId The {@code TableId} representing the destination table.
* @return The {@code Job} object representing this operation (which can be tracked to wait until
* it has finished successfully).
*/
public Job appendDestinationWithTemporary(TableId temporaryTableId, TableId destinationTableId) {
return copyData(temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_APPEND);
}

String sqlFromFormat(String queryFormat, TableId destinationTableId, TableId temporaryTableId) {
String destinationTableName = fullTableName(destinationTableId);
String sqlFromFormat(String queryFormat, TableId temporaryTableId) {
String temporaryTableName = fullTableName(temporaryTableId);
return String.format(queryFormat, destinationTableName, temporaryTableName);
return String.format(queryFormat, temporaryTableName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class BigQueryDirectDataWriterHelper {
private final ProtoSchema protoSchema;
private final RetrySettings retrySettings;
private final Optional<String> traceId;
private final int partitionId;
private final boolean writeAtLeastOnce;

private String writeStreamName;
private StreamWriter streamWriter;
Expand All @@ -66,18 +68,26 @@ public BigQueryDirectDataWriterHelper(
String tablePath,
ProtoSchema protoSchema,
RetrySettings bigqueryDataWriterHelperRetrySettings,
Optional<String> traceId) {
Optional<String> traceId,
int partitionId,
boolean writeAtLeastOnce) {
this.writeClient = writeClientFactory.getBigQueryWriteClient();
this.tablePath = tablePath;
this.protoSchema = protoSchema;
this.retrySettings = bigqueryDataWriterHelperRetrySettings;
this.traceId = traceId;

try {
this.writeStreamName = retryCreateWriteStream();
} catch (ExecutionException | InterruptedException e) {
throw new BigQueryConnectorException(
"Could not create write-stream after multiple retries", e);
this.partitionId = partitionId;
this.writeAtLeastOnce = writeAtLeastOnce;

if (writeAtLeastOnce) {
this.writeStreamName = this.tablePath + "/_default";
} else {
try {
this.writeStreamName = retryCreateWriteStream();
} catch (ExecutionException | InterruptedException e) {
throw new BigQueryConnectorException(
"Could not create write-stream after multiple retries", e);
}
}
this.streamWriter = createStreamWriter(this.writeStreamName);
this.protoRows = ProtoRows.newBuilder();
Expand Down Expand Up @@ -181,7 +191,7 @@ public void addRow(ByteString message) throws IOException {
* (deduplication error) or if the response contains an error.
*/
private void sendAppendRowsRequest() throws IOException {
long offset = writeStreamRowCount;
long offset = this.writeAtLeastOnce ? -1 : writeStreamRowCount;

ApiFuture<AppendRowsResponse> appendRowsResponseApiFuture =
streamWriter.append(protoRows.build(), offset);
Expand Down Expand Up @@ -216,14 +226,16 @@ private void validateAppendRowsResponse(
"Append request failed with error: " + appendRowsResponse.getError().getMessage());
}

AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
long responseOffset = appendResult.getOffset().getValue();
if (!this.writeAtLeastOnce) {
AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
long responseOffset = appendResult.getOffset().getValue();

if (expectedOffset != responseOffset) {
throw new IOException(
String.format(
"On stream %s append-rows response, offset %d did not match expected offset %d",
writeStreamName, responseOffset, expectedOffset));
if (expectedOffset != responseOffset) {
throw new IOException(
String.format(
"On stream %s append-rows response, offset %d did not match expected offset %d",
writeStreamName, responseOffset, expectedOffset));
}
}
}

Expand All @@ -241,25 +253,31 @@ public long finalizeStream() throws IOException {
if (this.protoRows.getSerializedRowsCount() != 0) {
sendAppendRowsRequest();
}
long responseFinalizedRowCount = writeStreamRowCount;

waitBeforeFinalization();
if (!this.writeAtLeastOnce) {
waitBeforeFinalization();

FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
FinalizeWriteStreamResponse finalizeResponse =
retryFinalizeWriteStream(finalizeWriteStreamRequest);
FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
FinalizeWriteStreamResponse finalizeResponse =
retryFinalizeWriteStream(finalizeWriteStreamRequest);

long expectedFinalizedRowCount = writeStreamRowCount;
long responseFinalizedRowCount = finalizeResponse.getRowCount();
if (responseFinalizedRowCount != expectedFinalizedRowCount) {
throw new IOException(
String.format(
"On stream %s finalization, expected finalized row count %d but received %d",
writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
long expectedFinalizedRowCount = writeStreamRowCount;
responseFinalizedRowCount = finalizeResponse.getRowCount();
if (responseFinalizedRowCount != expectedFinalizedRowCount) {
throw new IOException(
String.format(
"On stream %s finalization, expected finalized row count %d but received %d",
writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
}
}

logger.debug(
"Write-stream {} finalized with row-count {}", writeStreamName, responseFinalizedRowCount);
"Write-stream {} with name {} finalized with row-count {}",
partitionId,
writeStreamName,
responseFinalizedRowCount);

clean();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
public static final String ENABLE_LIST_INFERENCE = "enableListInference";
public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat";
public static final String WRITE_METHOD_PARAM = "writeMethod";
public static final String WRITE_AT_LEAST_ONCE_OPTION = "writeAtLeastOnce";
@VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW;

@VisibleForTesting
Expand Down Expand Up @@ -205,6 +206,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
boolean writeAtLeastOnce = false;
// for V2 write with BigQuery Storage Write API
RetrySettings bigqueryDataWriteHelperRetrySettings =
RetrySettings.newBuilder().setMaxAttempts(5).build();
Expand Down Expand Up @@ -372,6 +374,8 @@ public static SparkBigQueryConfig from(
getAnyOption(globalOptions, options, WRITE_METHOD_PARAM)
.transform(WriteMethod::from)
.or(writeMethodDefault);
config.writeAtLeastOnce =
getAnyBooleanOption(globalOptions, options, WRITE_AT_LEAST_ONCE_OPTION, false);

boolean validateSparkAvro =
config.writeMethod == WriteMethod.INDIRECT
Expand Down Expand Up @@ -943,6 +947,10 @@ public WriteMethod getWriteMethod() {
return writeMethod;
}

public boolean isWriteAtLeastOnce() {
return writeAtLeastOnce;
}

public Optional<String> getTraceId() {
return traceId.toJavaUtil();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void insert(Dataset<Row> data, boolean overwrite) {
throw new BigQueryConnectorException(
String.format(
"It seems that %s out of %s partitions have failed, aborting",
numPartitions - writerCommitMessages.length, writerCommitMessages.length));
numPartitions - writerCommitMessages.length, numPartitions));
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContex
tableConfig.getEnableModeCheckForSchemaFields(),
tableConfig.getBigQueryTableLabels(),
SchemaConvertersConfiguration.from(tableConfig),
tableConfig.getKmsKeyName()); // needs to be serializable
tableConfig.getKmsKeyName(), // needs to be serializable
tableConfig.isWriteAtLeastOnce());
}

@Singleton
Expand Down
Loading

0 comments on commit 38b0ef2

Please sign in to comment.