Skip to content

Commit

Permalink
Portable Managed BigQuery destinations (#33017)
Browse files Browse the repository at this point in the history
* managed bigqueryio

* spotless

* move managed dependency to test only

* cleanup after merging snake_case PR

* choose write method based on boundedness and pipeline options

* rename bigquery write config class

* spotless

* change read output tag to 'output'

* spotless

* revert logic that depends on DataflowServiceOptions. switching BQ methods can instead be done in Dataflow service side

* spotless

* fix typo

* separate BQ write config to a new class

* fix doc

* resolve after syncing to HEAD

* spotless

* fork on batch/streaming

* cleanup

* spotless

* portable bigquery destinations

* move forking logic to BQ schematransform side

* add file loads translation and tests; add test checks that the correct transform is chosen

* set top-level wrapper to be the underlying managed BQ transform urn; change tests to verify underlying transform name

* move unit tests to respectvie schematransform test classes

* expose to Python SDK as well

* cleanup

* address comment

* set enable_streaming_engine option; add to CHANGES
  • Loading branch information
ahmedabu98 authored Nov 13, 2024
1 parent 306c6d7 commit 50ed69a
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 145 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
* [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688))
* BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527))
* [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879))
* Added BigQueryIO as a Managed IO ([#31486](https://github.com/apache/beam/pull/31486))
* Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)).

## New Features / Improvements
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--tempLocation=${gcpTempRoot}",
"--firestoreDb=${firestoreDb}",
"--firestoreHost=${firestoreHost}",
"--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand Down Expand Up @@ -89,24 +90,27 @@ public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rowPCollection = input.getSinglePCollection();
BigQueryIO.Write<Row> write = toWrite(input.getPipeline().getOptions());
BigQueryIO.Write<Row> write =
toWrite(rowPCollection.getSchema(), input.getPipeline().getOptions());
rowPCollection.apply(write);

return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite(PipelineOptions options) {
BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(schema, configuration);
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(configuration.getTable())
.to(dynamicDestinations)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
// TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
// createTempFilePrefixView() doesn't pick up the pipeline option
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.useBeamSchema();
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,22 @@
package org.apache.beam.sdk.io.gcp.bigquery.providers;

import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.DESTINATION;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.RECORD;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand All @@ -54,7 +51,6 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.joda.time.Duration;
Expand All @@ -80,6 +76,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_TAG = "FailedRows";
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
Expand Down Expand Up @@ -176,52 +173,6 @@ private static class NoOutputDoFn<T> extends DoFn<T, Row> {
public void process(ProcessContext c) {}
}

private static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
final Schema schema;
final String fixedDestination;
final List<String> primaryKey;

RowDynamicDestinations(Schema schema) {
this.schema = schema;
this.fixedDestination = null;
this.primaryKey = null;
}

public RowDynamicDestinations(
Schema schema, String fixedDestination, List<String> primaryKey) {
this.schema = schema;
this.fixedDestination = fixedDestination;
this.primaryKey = primaryKey;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return Optional.ofNullable(fixedDestination)
.orElseGet(() -> element.getValue().getString("destination"));
}

@Override
public TableDestination getTable(String destination) {
return new TableDestination(destination, null);
}

@Override
public TableSchema getSchema(String destination) {
return BigQueryUtils.toTableSchema(schema);
}

@Override
public TableConstraints getTableConstraints(String destination) {
return Optional.ofNullable(this.primaryKey)
.filter(pk -> !pk.isEmpty())
.map(
pk ->
new TableConstraints()
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
Expand Down Expand Up @@ -309,13 +260,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
Method writeMethod =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -326,21 +270,37 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.withMethod(writeMethod)
.withFormatFunction(BigQueryUtils.toTableRow())
.withWriteDisposition(WriteDisposition.WRITE_APPEND);

// in case CDC writes are configured we validate and include them in the configuration
if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
Schema rowSchema = schema;
boolean fetchNestedRecord = false;
if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsSchema(schema);
rowSchema = schema.getField(RECORD).getType().getRowSchema();
fetchNestedRecord = true;
}
if (Boolean.TRUE.equals(configuration.getUseCdcWrites())) {
validateCdcSchema(schema);
rowSchema = schema.getField(RECORD).getType().getRowSchema();
fetchNestedRecord = true;
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")));
} else {
write = write.to(configuration.getTable()).useBeamSchema();
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_SQN)));
}
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(rowSchema, configuration);
write =
write
.to(dynamicDestinations)
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(fetchNestedRecord));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand All @@ -363,19 +323,27 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
return write;
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
void validateDynamicDestinationsSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(DESTINATION, RECORD)),
String.format(
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"%s\" string field and a \"%s\" Row field.",
DESTINATION, RECORD));
}

private void validateCdcSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")),
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, RECORD)),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" Row field and a \"record\" Row field.");

Schema rowSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();
Schema mutationSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();

checkArgument(
rowSchema.equals(ROW_SCHEMA_MUTATION_INFO),
mutationSchema != null && mutationSchema.equals(ROW_SCHEMA_MUTATION_INFO),
"When writing using CDC functionality, we expect a \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field of Row type with schema:\n"
Expand All @@ -384,31 +352,7 @@ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
+ "Received \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field with schema:\n"
+ rowSchema.toString());

String tableDestination = null;

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
} else {
tableDestination = configuration.getTable();
}

return write
.to(
new RowDynamicDestinations(
schema.getField("record").getType().getRowSchema(),
tableDestination,
configuration.getPrimaryKey()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN)));
+ mutationSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package org.apache.beam.sdk.io.gcp.bigquery.providers;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Configuration for writing to BigQuery with SchemaTransforms. Used by {@link
Expand Down Expand Up @@ -68,11 +66,6 @@ public void validate() {
!Strings.isNullOrEmpty(this.getTable()),
invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");

// if we have an input table spec, validate it
if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
}

// validate create and write dispositions
String createDisposition = getCreateDisposition();
if (createDisposition != null && !createDisposition.isEmpty()) {
Expand Down Expand Up @@ -186,6 +179,21 @@ public static Builder builder() {
@Nullable
public abstract List<String> getPrimaryKey();

@SchemaFieldDescription(
"A list of field names to keep in the input record. All other fields are dropped before writing. "
+ "Is mutually exclusive with 'drop' and 'only'.")
public abstract @Nullable List<String> getKeep();

@SchemaFieldDescription(
"A list of field names to drop from the input record before writing. "
+ "Is mutually exclusive with 'keep' and 'only'.")
public abstract @Nullable List<String> getDrop();

@SchemaFieldDescription(
"The name of a single record field that should be written. "
+ "Is mutually exclusive with 'keep' and 'drop'.")
public abstract @Nullable String getOnly();

/** Builder for {@link BigQueryWriteConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -212,6 +220,12 @@ public abstract static class Builder {

public abstract Builder setPrimaryKey(List<String> pkColumns);

public abstract Builder setKeep(List<String> keep);

public abstract Builder setDrop(List<String> drop);

public abstract Builder setOnly(String only);

/** Builds a {@link BigQueryWriteConfiguration} instance. */
public abstract BigQueryWriteConfiguration build();
}
Expand Down
Loading

0 comments on commit 50ed69a

Please sign in to comment.