Skip to content

Commit

Permalink
Implement at-least-once option that utilizes default stream
Browse files Browse the repository at this point in the history
to support streaming scenarios in which small batches of data
are written.
  • Loading branch information
agrawal-siddharth committed Jun 28, 2023
1 parent 02b189b commit 1c48241
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 96 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,17 @@ The API Supports a number of options to configure the read
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>writeAtLeastOnce</code>
</td>
<td>Guarantees that data is written to BigQuery at least once. This is a lesser
guarantee than exactly once. This is suitable for streaming scenarios
in which data is continuously being written in small batches.
<br/>(Optional. Defaults to <code>false</code>)
<br/><i>Supported only by the `DIRECT` write method.</i>
</td>
<td>Write</td>
</tr>
<tr valign="top">
<td><code>temporaryGcsBucket</code>
</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ public boolean deleteTable(TableId tableId) {
return bigQuery.delete(tableId);
}

private Job buildQueryJob(
TableId temporaryTableId,
TableId destinationTableId,
JobInfo.WriteDisposition writeDisposition) {
String queryFormat = "SELECT * FROM `%s`";
QueryJobConfiguration queryConfig =
jobConfigurationFactory
.createQueryJobConfigurationBuilder(
sqlFromFormat(queryFormat, temporaryTableId), Collections.emptyMap())
.setUseLegacySql(false)
.setDestinationTable(destinationTableId)
.setWriteDisposition(writeDisposition)
.build();

return create(JobInfo.newBuilder(queryConfig).build());
}

/**
* Overwrites the given destination table, with all the data from the given temporary table,
* transactionally.
Expand All @@ -219,28 +236,27 @@ public boolean deleteTable(TableId tableId) {
*/
public Job overwriteDestinationWithTemporary(
TableId temporaryTableId, TableId destinationTableId) {
String queryFormat =
"MERGE `%s`\n"
+ "USING (SELECT * FROM `%s`)\n"
+ "ON FALSE\n"
+ "WHEN NOT MATCHED THEN INSERT ROW\n"
+ "WHEN NOT MATCHED BY SOURCE THEN DELETE";

QueryJobConfiguration queryConfig =
jobConfigurationFactory
.createQueryJobConfigurationBuilder(
sqlFromFormat(queryFormat, destinationTableId, temporaryTableId),
Collections.emptyMap())
.setUseLegacySql(false)
.build();
return buildQueryJob(
temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_TRUNCATE);
}

return create(JobInfo.newBuilder(queryConfig).build());
/**
* Appends all the data from the given temporary table, to the given destination table,
* transactionally.
*
* @param temporaryTableId The {@code TableId} representing the temporary-table.
* @param destinationTableId The {@code TableId} representing the destination table.
* @return The {@code Job} object representing this operation (which can be tracked to wait until
* it has finished successfully).
*/
public Job appendDestinationWithTemporary(TableId temporaryTableId, TableId destinationTableId) {
return buildQueryJob(
temporaryTableId, destinationTableId, JobInfo.WriteDisposition.WRITE_APPEND);
}

String sqlFromFormat(String queryFormat, TableId destinationTableId, TableId temporaryTableId) {
String destinationTableName = fullTableName(destinationTableId);
String sqlFromFormat(String queryFormat, TableId temporaryTableId) {
String temporaryTableName = fullTableName(temporaryTableId);
return String.format(queryFormat, destinationTableName, temporaryTableName);
return String.format(queryFormat, temporaryTableName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class BigQueryDirectDataWriterHelper {
private final ProtoSchema protoSchema;
private final RetrySettings retrySettings;
private final Optional<String> traceId;
private final int partitionId;
private final boolean writeAtLeastOnce;

private String writeStreamName;
private StreamWriter streamWriter;
Expand All @@ -66,18 +68,26 @@ public BigQueryDirectDataWriterHelper(
String tablePath,
ProtoSchema protoSchema,
RetrySettings bigqueryDataWriterHelperRetrySettings,
Optional<String> traceId) {
Optional<String> traceId,
int partitionId,
boolean writeAtLeastOnce) {
this.writeClient = writeClientFactory.getBigQueryWriteClient();
this.tablePath = tablePath;
this.protoSchema = protoSchema;
this.retrySettings = bigqueryDataWriterHelperRetrySettings;
this.traceId = traceId;

try {
this.writeStreamName = retryCreateWriteStream();
} catch (ExecutionException | InterruptedException e) {
throw new BigQueryConnectorException(
"Could not create write-stream after multiple retries", e);
this.partitionId = partitionId;
this.writeAtLeastOnce = writeAtLeastOnce;

if (writeAtLeastOnce) {
this.writeStreamName = this.tablePath + "/_default";
} else {
try {
this.writeStreamName = retryCreateWriteStream();
} catch (ExecutionException | InterruptedException e) {
throw new BigQueryConnectorException(
"Could not create write-stream after multiple retries", e);
}
}
this.streamWriter = createStreamWriter(this.writeStreamName);
this.protoRows = ProtoRows.newBuilder();
Expand Down Expand Up @@ -181,7 +191,7 @@ public void addRow(ByteString message) throws IOException {
* (deduplication error) or if the response contains an error.
*/
private void sendAppendRowsRequest() throws IOException {
long offset = writeStreamRowCount;
long offset = this.writeAtLeastOnce ? -1 : writeStreamRowCount;

ApiFuture<AppendRowsResponse> appendRowsResponseApiFuture =
streamWriter.append(protoRows.build(), offset);
Expand Down Expand Up @@ -216,14 +226,16 @@ private void validateAppendRowsResponse(
"Append request failed with error: " + appendRowsResponse.getError().getMessage());
}

AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
long responseOffset = appendResult.getOffset().getValue();
if (!this.writeAtLeastOnce) {
AppendRowsResponse.AppendResult appendResult = appendRowsResponse.getAppendResult();
long responseOffset = appendResult.getOffset().getValue();

if (expectedOffset != responseOffset) {
throw new IOException(
String.format(
"On stream %s append-rows response, offset %d did not match expected offset %d",
writeStreamName, responseOffset, expectedOffset));
if (expectedOffset != responseOffset) {
throw new IOException(
String.format(
"On stream %s append-rows response, offset %d did not match expected offset %d",
writeStreamName, responseOffset, expectedOffset));
}
}
}

Expand All @@ -241,25 +253,31 @@ public long finalizeStream() throws IOException {
if (this.protoRows.getSerializedRowsCount() != 0) {
sendAppendRowsRequest();
}
long responseFinalizedRowCount = writeStreamRowCount;

waitBeforeFinalization();
if (!this.writeAtLeastOnce) {
waitBeforeFinalization();

FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
FinalizeWriteStreamResponse finalizeResponse =
retryFinalizeWriteStream(finalizeWriteStreamRequest);
FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder().setName(writeStreamName).build();
FinalizeWriteStreamResponse finalizeResponse =
retryFinalizeWriteStream(finalizeWriteStreamRequest);

long expectedFinalizedRowCount = writeStreamRowCount;
long responseFinalizedRowCount = finalizeResponse.getRowCount();
if (responseFinalizedRowCount != expectedFinalizedRowCount) {
throw new IOException(
String.format(
"On stream %s finalization, expected finalized row count %d but received %d",
writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
long expectedFinalizedRowCount = writeStreamRowCount;
responseFinalizedRowCount = finalizeResponse.getRowCount();
if (responseFinalizedRowCount != expectedFinalizedRowCount) {
throw new IOException(
String.format(
"On stream %s finalization, expected finalized row count %d but received %d",
writeStreamName, expectedFinalizedRowCount, responseFinalizedRowCount));
}
}

logger.debug(
"Write-stream {} finalized with row-count {}", writeStreamName, responseFinalizedRowCount);
"Write-stream {} with name {} finalized with row-count {}",
partitionId,
writeStreamName,
responseFinalizedRowCount);

clean();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
public static final String ENABLE_LIST_INFERENCE = "enableListInference";
public static final String INTERMEDIATE_FORMAT_OPTION = "intermediateFormat";
public static final String WRITE_METHOD_PARAM = "writeMethod";
public static final String WRITE_AT_LEAST_ONCE_OPTION = "writeAtLeastOnce";
@VisibleForTesting static final DataFormat DEFAULT_READ_DATA_FORMAT = DataFormat.ARROW;

@VisibleForTesting
Expand Down Expand Up @@ -205,6 +206,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
private SparkBigQueryProxyAndHttpConfig sparkBigQueryProxyAndHttpConfig;
private CompressionCodec arrowCompressionCodec = DEFAULT_ARROW_COMPRESSION_CODEC;
private WriteMethod writeMethod = DEFAULT_WRITE_METHOD;
boolean writeAtLeastOnce = false;
// for V2 write with BigQuery Storage Write API
RetrySettings bigqueryDataWriteHelperRetrySettings =
RetrySettings.newBuilder().setMaxAttempts(5).build();
Expand Down Expand Up @@ -372,6 +374,8 @@ public static SparkBigQueryConfig from(
getAnyOption(globalOptions, options, WRITE_METHOD_PARAM)
.transform(WriteMethod::from)
.or(writeMethodDefault);
config.writeAtLeastOnce =
getAnyBooleanOption(globalOptions, options, WRITE_AT_LEAST_ONCE_OPTION, false);

boolean validateSparkAvro =
config.writeMethod == WriteMethod.INDIRECT
Expand Down Expand Up @@ -943,6 +947,10 @@ public WriteMethod getWriteMethod() {
return writeMethod;
}

public boolean isWriteAtLeastOnce() {
return writeAtLeastOnce;
}

public Optional<String> getTraceId() {
return traceId.toJavaUtil();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void insert(Dataset<Row> data, boolean overwrite) {
throw new BigQueryConnectorException(
String.format(
"It seems that %s out of %s partitions have failed, aborting",
numPartitions - writerCommitMessages.length, writerCommitMessages.length));
numPartitions - writerCommitMessages.length, numPartitions));
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public BigQueryDirectDataSourceWriterContext provideDirectDataSourceWriterContex
tableConfig.getEnableModeCheckForSchemaFields(),
tableConfig.getBigQueryTableLabels(),
SchemaConvertersConfiguration.from(tableConfig),
tableConfig.getKmsKeyName()); // needs to be serializable
tableConfig.getKmsKeyName(), // needs to be serializable
tableConfig.isWriteAtLeastOnce());
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ public class BigQueryDirectDataSourceWriterContext implements DataSourceWriterCo
private final String tablePathForBigQueryStorage;
private final SchemaConvertersConfiguration schemaConvertersConfiguration;
private final Optional<String> destinationTableKmsKeyName;
private final boolean writeAtLeastOnce;

private BigQueryWriteClient writeClient;
private Optional<TableInfo> tableInfo = Optional.absent();

enum WritingMode {
IGNORE_INPUTS,
APPEND_AT_LEAST_ONCE,
OVERWRITE,
ALL_ELSE
}
Expand All @@ -85,7 +87,8 @@ public BigQueryDirectDataSourceWriterContext(
boolean enableModeCheckForSchemaFields,
ImmutableMap<String, String> tableLabels,
SchemaConvertersConfiguration schemaConvertersConfiguration,
java.util.Optional<String> destinationTableKmsKeyName)
java.util.Optional<String> destinationTableKmsKeyName,
boolean writeAtLeastOnce)
throws IllegalArgumentException {
this.bigQueryClient = bigQueryClient;
this.writeClientFactory = bigQueryWriteClientFactory;
Expand All @@ -98,6 +101,7 @@ public BigQueryDirectDataSourceWriterContext(
this.tableLabels = tableLabels;
this.schemaConvertersConfiguration = schemaConvertersConfiguration;
this.destinationTableKmsKeyName = Optional.fromJavaUtil(destinationTableKmsKeyName);
this.writeAtLeastOnce = writeAtLeastOnce;
Schema bigQuerySchema =
SchemaConverters.from(this.schemaConvertersConfiguration).toBigQuerySchema(sparkSchema);
try {
Expand Down Expand Up @@ -142,6 +146,12 @@ private BigQueryTable getOrCreateTable(
"Destination table's schema is not compatible with dataframe's schema"));
switch (saveMode) {
case Append:
if (writeAtLeastOnce) {
writingMode = WritingMode.APPEND_AT_LEAST_ONCE;
return new BigQueryTable(
bigQueryClient.createTempTable(destinationTableId, bigQuerySchema).getTableId(),
true);
}
break;
case Overwrite:
writingMode = WritingMode.OVERWRITE;
Expand Down Expand Up @@ -177,7 +187,8 @@ public DataWriterContextFactory<InternalRow> createWriterContextFactory() {
protoSchema,
writingMode.equals(WritingMode.IGNORE_INPUTS),
bigqueryDataWriterHelperRetrySettings,
traceId);
traceId,
writeAtLeastOnce);
}

@Override
Expand All @@ -203,29 +214,35 @@ public void commit(WriterCommitMessageContext[] messages) {
writeUUID,
Arrays.toString(messages));

BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest =
BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage);
for (WriterCommitMessageContext message : messages) {
batchCommitWriteStreamsRequest.addWriteStreams(
((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName());
}
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
writeClient.batchCommitWriteStreams(batchCommitWriteStreamsRequest.build());
if (!writeAtLeastOnce) {
BatchCommitWriteStreamsRequest.Builder batchCommitWriteStreamsRequest =
BatchCommitWriteStreamsRequest.newBuilder().setParent(tablePathForBigQueryStorage);
for (WriterCommitMessageContext message : messages) {
batchCommitWriteStreamsRequest.addWriteStreams(
((BigQueryDirectWriterCommitMessageContext) message).getWriteStreamName());
}
BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse =
writeClient.batchCommitWriteStreams(batchCommitWriteStreamsRequest.build());

if (!batchCommitWriteStreamsResponse.hasCommitTime()) {
throw new BigQueryConnectorException(
"DataSource writer failed to batch commit its BigQuery write-streams");
}
if (!batchCommitWriteStreamsResponse.hasCommitTime()) {
throw new BigQueryConnectorException(
"DataSource writer failed to batch commit its BigQuery write-streams");
}

logger.info(
"BigQuery DataSource writer has committed at time: {}",
batchCommitWriteStreamsResponse.getCommitTime());
logger.info(
"BigQuery DataSource writer has committed at time: {}",
batchCommitWriteStreamsResponse.getCommitTime());
}

if (writingMode.equals(WritingMode.OVERWRITE)) {
Job overwriteJob =
bigQueryClient.overwriteDestinationWithTemporary(
tableToWrite.getTableId(), destinationTableId);
BigQueryClient.waitForJob(overwriteJob);
if (writingMode.equals(WritingMode.APPEND_AT_LEAST_ONCE)
|| writingMode.equals(WritingMode.OVERWRITE)) {
Job queryJob =
(writingMode.equals(WritingMode.OVERWRITE))
? bigQueryClient.overwriteDestinationWithTemporary(
tableToWrite.getTableId(), destinationTableId)
: bigQueryClient.appendDestinationWithTemporary(
tableToWrite.getTableId(), destinationTableId);
BigQueryClient.waitForJob(queryJob);
Preconditions.checkState(
bigQueryClient.deleteTable(tableToWrite.getTableId()),
new BigQueryConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public BigQueryDirectDataWriterContext(
StructType sparkSchema,
ProtoSchema protoSchema,
RetrySettings bigqueryDataWriterHelperRetrySettings,
Optional<String> traceId) {
Optional<String> traceId,
boolean writeAtLeastOnce) {
this.partitionId = partitionId;
this.taskId = taskId;
this.epochId = epochId;
Expand All @@ -76,7 +77,9 @@ public BigQueryDirectDataWriterContext(
tablePath,
protoSchema,
bigqueryDataWriterHelperRetrySettings,
traceId);
traceId,
partitionId,
writeAtLeastOnce);
}

@Override
Expand Down
Loading

0 comments on commit 1c48241

Please sign in to comment.